import os import re import struct import shutil import itunespy import datetime import urllib.request from tempfile import NamedTemporaryFile import beets.mediafile as mediafile from beets import config from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance from beets.autotag.match import tag_album from beets.plugins import BeetsPlugin, sanitize_choices from beets.dbcore import types from beets.ui.commands import PromptChoice AVAILABLE_COUNTRIES = ['DE', 'US', 'GB', 'FR', 'AU', 'CA', 'IT', 'JP', 'DZ', 'AO', 'AI', 'AG', 'AR', 'AM', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', 'BY', 'BE', 'BZ', 'BM', 'BO', 'BW', 'BR', 'BN', 'BG', 'CM', 'KY', 'CL', 'CN', 'CO', 'CR', 'CI', 'HR', 'CY', 'CZ', 'DK', 'DM', 'DO', 'EC', 'EG', 'SV', 'EE', 'ET', 'FI', 'GH', 'GR', 'GD', 'GT', 'GY', 'HN', 'HK', 'HU', 'IS', 'IN', 'ID', 'IE', 'IL', 'JM', 'JO', 'KZ', 'KE', 'KR', 'KW', 'LV', 'LB', 'LY', 'LI', 'LT', 'LU', 'MO', 'MK', 'MG', 'MY', 'MV', 'ML', 'MT', 'MU', 'MX', 'MD', 'MS', 'MM', 'NP', 'NL', 'NZ', 'NI', 'NE', 'NG', 'NO', 'OM', 'PK', 'PA', 'PY', 'PE', 'PH', 'PL', 'PT', 'QA', 'RO', 'RU', 'KN', 'LC', 'VC', 'SA', 'SN', 'RS', 'SG', 'SK', 'SI', 'ZA', 'ES', 'LK', 'SR', 'SE', 'CH', 'TW', 'TZ', 'TH', 'TT', 'TN', 'TR', 'TC', 'UG', 'UA', 'AE', 'UY', 'UZ', 'VE', 'VN', 'VG', 'YE'] VA_ARTIST_ID = 120644327 _track_ids = {} class iTunesPlugin(BeetsPlugin): """A beets plugin that adds the iTunes API as a metadata source. The plugin can also lookup song metadata of songs the have been matched by iTunes Match. """ def __init__(self): super(iTunesPlugin, self).__init__() self.add_media_field('release_date', mediafile.MediaField( mediafile.MP3StorageStyle('TDRC'), mediafile.MP4StorageStyle('\xa9day'), mediafile.StorageStyle('DATE'), mediafile.ASFStorageStyle('WM/Year') )) self.add_media_field('title_sort', mediafile.MediaField( mediafile.MP3StorageStyle('TSOT'), mediafile.MP4StorageStyle('sonm'), mediafile.StorageStyle('TITLESORT'), mediafile.ASFStorageStyle('WM/TitleSortOrder') )) self.add_media_field('album_sort', mediafile.MediaField( mediafile.MP3StorageStyle('TSOA'), mediafile.MP4StorageStyle('soal'), mediafile.StorageStyle('ALBUMSORT'), mediafile.ASFStorageStyle('WM/AlbumSortOrder') )) self.add_media_field('copyright', mediafile.MediaField( mediafile.MP3StorageStyle('TCOP'), mediafile.MP4StorageStyle('cprt'), mediafile.StorageStyle('COPYRIGHT'), mediafile.ASFStorageStyle('Copyright') )) self.add_media_field('itunes_content_id', mediafile.MediaField( mediafile.MP4StorageStyle('cnID', as_type=int), mediafile.StorageStyle(u'ITUNES_CONTENT_ID', as_type=int) )) self.add_media_field('itunes_artist_id', mediafile.MediaField( mediafile.MP4StorageStyle('atID', as_type=int), mediafile.StorageStyle(u'ITUNES_ARTIST_ID', as_type=int) )) self.add_media_field('itunes_advisory', mediafile.MediaField( mediafile.MP4StorageStyle('rtng', as_type=int), mediafile.StorageStyle('ITUNES_ADVISORY', as_type=int) )) self.add_media_field('itunes_media_type', mediafile.MediaField( mediafile.MP4StorageStyle('stik', as_type=int), mediafile.StorageStyle('ITUNES_MEDIA_TYPE', as_type=int) )) self.config.add({ 'search_countries': ['US'], 'lookup_countries': ['US', 'DE', 'AU', 'GB', 'FR', 'IT', 'JP'], 'search_limit': 5, 'sanitize_searches': True, # Whether to remove parenthesized and braced text from searches 'artwork': False, # Whether to also download artwork from iTunes. Does not work well with fetchart 'method': 'lookup', # Valid values: 'search', 'lookup', 'both', 'lookup (search fallback)' }) self.force_search = False if self.method in ['lookup', 'lookup (search fallback)', 'both']: self.register_listener('import_task_start', self.scan_track_ids) if self.method in ['lookup', 'lookup (search fallback)']: self.register_listener('before_choose_candidate', self.add_prompt_choices) self.register_listener('import_task_apply', self.apply_itunes_metadata) if self.use_itunes_artwork: self.import_stages += [ self.fetch_artwork ] self.register_listener('import_task_files', self.assign_artwork) @property def method(self): return self.config['method'].as_str() @property def use_itunes_artwork(self): return self.config['artwork'] @property def search_countries(self): countries = self.config['search_countries'].as_str_seq() return sanitize_choices(countries, AVAILABLE_COUNTRIES) @property def lookup_countries(self): countries = self.config['lookup_countries'].as_str_seq() return sanitize_choices(countries, AVAILABLE_COUNTRIES) @property def search_limit(self): return self.config['search_limit'].as_number() @property def sanitize_searches(self): return self.config['sanitize_searches'] def scan_track_ids(self, session, task): self._log.debug("Scanning track IDs") for item in task.items: with open(item.path, 'rb') as file: data = file.read(1000) # Song ID always seems to be around 600-700 bytes in location = data.find(b'song') if location < 0: continue file.seek(location + 4) song_id = struct.unpack('>i', file.read(4)) _track_ids[item] = song_id[0] self._log.debug("Found track ID {0} for {1}", song_id[0], item) def candidates(self, items, artist, album, va_likely): """Returns a list of AlbumInfo objects for iTunes search results matching an album and artist. """ force_search = self.force_search self.force_search = False try: candidates = [] # Lookup the track ids assigned by iTunes Match if self.method in ['lookup', 'lookup (search fallback)', 'both']: candidates += self.lookup_albums(items) # If the lookup was not successful ignore the ID for distance evaluation if self.method == 'lookup (search fallback)' and len(candidates) == 0: for item in items: _track_ids.pop(item, None) if force_search or self.method in ['search', 'both'] or (self.method == 'lookup (search fallback)' and len(candidates) == 0): candidates += self.search_albums(artist, album, va_likely) return candidates except ConnectionError: self._log.debug(u'Cannot search iTunes (no network connection)') return [] def lookup_albums(self, items): ids = [ str(_track_ids[item]) for item in items if item in _track_ids ] def remove_found_tracks(album): for track in album.get_tracks(): if track.track_id in ids: ids.remove(track.track_id) albums = [] for country in self.lookup_countries: self._log.debug("Searching iTunes Store: {0}", country) try: results = itunespy.lookup_album(','.join(ids), country=country) album_results = [ album for album in results if isinstance(album, itunespy.music_album.MusicAlbum) ] albums += [ self.make_album_info(album, True) for album in album_results ] # Remove IDs that have been found. If an album has been found for all tracks stop searching. for album in album_results: remove_found_tracks(album) if not ids: break except LookupError: pass return albums def search_albums(self, artist, album, va_likely): """Searches for matching albums on iTunes. """ if self.sanitize_searches: pattern = re.compile('(\s*(\(.*\)|\[.*\]|\{.*\}))*$') album = pattern.sub('', album) artist = pattern.sub('', artist) if va_likely: query = album else: query = '%s %s' % (artist, album) print("Searching", query) albums = [] for country in self.search_countries: try: results = itunespy.search_album(query, country=country, limit=self.search_limit) albums += [self.make_album_info(result) for result in results] except LookupError: self._log.debug(u'Search for {0} did return no results', query) return [] return albums def album_for_id(self, album_id): try: for country in self.lookup_countries: try: album = itunespy.lookup_album(album_id, country=country) return self.make_album_info(album, True) except LookupError: pass except ConnectionError: self._log.debug(u'Cannot search iTunes (no network connection)') return None self._log.debug(u'The lookup for an album with id {0} returned zero results.', album_id) return None def track_for_id(self, track_id): try: for country in self.lookup_countries: try: track = itunespy.lookup_track(track_id, country=country) return self.make_track_info(track) except LookupError: pass except ConnectionError: self._log.debug(u'Cannot search iTunes (no network connection)') return None self._log.debug(u'The lookup for a track with id {0} returned zero results.', track_id) return None def track_distance(self, item, info): dist = Distance() if item in _track_ids: dist.add_equality('track_id', _track_ids[item], getattr(info, 'itunes_content_id', None)) return dist def apply_itunes_metadata(self, session, task): for item in task.imported_items(): info = task.match.mapping[item] for tag in ['genre', 'title_sort', 'album_sprt', 'itunes_content_id', 'itunes_artist_id', 'itunes_advisory']: item[tag] = getattr(info, tag, None) if task.is_album: album_info = task.match.info for tag in ['copyright']: item[tag] = getattr(album_info, tag, None) # Fix Release Date item.release_date = "%04i-%02i-%02i" % (album_info.year, album_info.month, album_info.day) item.itunes_media_type = 1 # Set media type to "Music" def fetch_artwork(self, session, task): try: artwork_url = task.match.info.itunes_artwork_url self._log.debug(u"Fetching artwork from iTunes...") filename, extension = os.path.splitext(artwork_url) with urllib.request.urlopen(artwork_url) as response, NamedTemporaryFile(suffix=extension, delete=False) as tempfile: shutil.copyfileobj(response, tempfile) task.itunes_artwork_file = tempfile.name self._log.debug(u"iTunes artwork downloaded to {0}", task.artwork_file) except AttributeError: pass def assign_artwork(self, session, task): try: artwork_file = task.itunes_artwork_file task.album.set_art(artwork_file) task.album.art_source = "iTunes" task.album.store() os.remove(artwork_file) except AttributeError: pass def make_album_info(self, album, matched=False): tracks = [ self.make_track_info(track, index) for (index, track) in enumerate(album.get_tracks()) ] year = int(album.release_date[0:4]) month = int(album.release_date[5:7]) day = int(album.release_date[8:10]) info = AlbumInfo(album.collection_name, album.collection_id, album.artist_name, album.artist_id, tracks, asin = None, albumtype = None, va = album.artist_id == VA_ARTIST_ID, year = year, month = month, day = day, label = "iTunes Match" if matched else "iTunes Store", mediums = album.get_tracks()[0].disc_count, artist_sort = sort_string(album.artist_name), releasegroup_id = None, catalognum = None, script = "Unicode", language = None, country = album.country, albumstatus = None, media = None, albumdisambig = None, artist_credit = None, original_year = year, # In order to eliminate the unneccessary year penalty original_month = month, # See original_year original_day = day, # See original_year data_source = "iTunes", data_url = album.collection_view_url) info.album_sort = sort_string(album.collection_name) info.itunes_content_id = album.collection_id info.itunes_artist_id = album.artist_id info.genre = album.primary_genre_name info.itunes_advisory = decode_explicitness(album.collection_explicitness) info.itunes_artwork_url = decode_artwork_url(album.artwork_url_100) info.copyright = album.copyright return info def make_track_info(self, track, index): info = TrackInfo(track.track_name, track.track_id, artist = track.artist_name, artist_id = track.artist_id, length = track.track_time / 1000, index = index, medium = track.disc_number, medium_index = track.track_number, medium_total = track.track_count, artist_sort = sort_string(track.artist_name), disctitle = None, artist_credit = None, data_source = "iTunes", data_url = track.track_view_url, media = None, lyricist = None, composer = None, composer_sort = None, arranger = None, track_alt = None) info.title_sort = sort_string(track.track_name) info.album_sort = sort_string(track.collection_name) info.itunes_content_id = track.track_id info.itunes_artist_id = track.artist_id info.genre = track.primary_genre_name info.itunes_advisory = decode_explicitness(track.track_explicitness) info.itunes_artwork_url = decode_artwork_url(track.artwork_url_100) return info def add_prompt_choices(self, session, task): return [PromptChoice('c', 'searCh iTunes', self.prompt_search)] def prompt_search(self, session, task): self.force_search = True _, _, proposal = tag_album(task.items) return proposal def decode_explicitness(advisory_string): return ['notExplicit', 'explicit', 'cleaned'].index(advisory_string) def decode_artwork_url(url): return url.replace("100x100", "1500x1500") def sort_string(value): return re.sub('^(A |An |The |Der |Die |Das |Ein |Eine )', '', value, flags=re.IGNORECASE)