1
This repository has been archived on 2022-08-08. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
beets-plugins/beetsplug/itunes.py
Kim Wittenburg b969ddbf40 Fix ID Lookups
2018-09-04 17:41:38 +02:00

351 lines
16 KiB
Python

import os
import re
import struct
import shutil
import itunespy
import datetime
import urllib.request
from tempfile import NamedTemporaryFile
import beets.mediafile as mediafile
from beets import config
from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance
from beets.autotag.match import tag_album
from beets.plugins import BeetsPlugin, sanitize_choices
from beets.dbcore import types
from beets.importer import action
from beets.ui import print_, colorize
from beets.ui.commands import PromptChoice
AVAILABLE_COUNTRIES = ['DE', 'US', 'GB', 'FR', 'AU', 'CA', 'IT', 'JP', 'DZ', 'AO', 'AI', 'AG', 'AR', 'AM', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', 'BY', 'BE', 'BZ', 'BM', 'BO', 'BW', 'BR', 'BN', 'BG', 'CM', 'KY', 'CL', 'CN', 'CO', 'CR', 'CI', 'HR', 'CY', 'CZ', 'DK', 'DM', 'DO', 'EC', 'EG', 'SV', 'EE', 'ET', 'FI', 'GH', 'GR', 'GD', 'GT', 'GY', 'HN', 'HK', 'HU', 'IS', 'IN', 'ID', 'IE', 'IL', 'JM', 'JO', 'KZ', 'KE', 'KR', 'KW', 'LV', 'LB', 'LY', 'LI', 'LT', 'LU', 'MO', 'MK', 'MG', 'MY', 'MV', 'ML', 'MT', 'MU', 'MX', 'MD', 'MS', 'MM', 'NP', 'NL', 'NZ', 'NI', 'NE', 'NG', 'NO', 'OM', 'PK', 'PA', 'PY', 'PE', 'PH', 'PL', 'PT', 'QA', 'RO', 'RU', 'KN', 'LC', 'VC', 'SA', 'SN', 'RS', 'SG', 'SK', 'SI', 'ZA', 'ES', 'LK', 'SR', 'SE', 'CH', 'TW', 'TZ', 'TH', 'TT', 'TN', 'TR', 'TC', 'UG', 'UA', 'AE', 'UY', 'UZ', 'VE', 'VN', 'VG', 'YE']
VA_ARTIST_ID = 120644327
_track_ids = {}
class iTunesPlugin(BeetsPlugin):
"""A beets plugin that adds the iTunes API as a metadata source.
The plugin can also lookup song metadata of songs the have been
matched by iTunes Match.
"""
def __init__(self):
super(iTunesPlugin, self).__init__()
self.add_media_field('release_date', mediafile.MediaField(
mediafile.MP3StorageStyle('TDRC'),
mediafile.MP4StorageStyle('\xa9day'),
mediafile.StorageStyle('DATE'),
mediafile.ASFStorageStyle('WM/Year')
))
self.add_media_field('title_sort', mediafile.MediaField(
mediafile.MP3StorageStyle('TSOT'),
mediafile.MP4StorageStyle('sonm'),
mediafile.StorageStyle('TITLESORT'),
mediafile.ASFStorageStyle('WM/TitleSortOrder')
))
self.add_media_field('album_sort', mediafile.MediaField(
mediafile.MP3StorageStyle('TSOA'),
mediafile.MP4StorageStyle('soal'),
mediafile.StorageStyle('ALBUMSORT'),
mediafile.ASFStorageStyle('WM/AlbumSortOrder')
))
self.add_media_field('copyright', mediafile.MediaField(
mediafile.MP3StorageStyle('TCOP'),
mediafile.MP4StorageStyle('cprt'),
mediafile.StorageStyle('COPYRIGHT'),
mediafile.ASFStorageStyle('Copyright')
))
self.add_media_field('itunes_content_id', mediafile.MediaField(
mediafile.MP4StorageStyle('cnID', as_type=int),
mediafile.StorageStyle(u'ITUNES_CONTENT_ID', as_type=int)
))
self.add_media_field('itunes_artist_id', mediafile.MediaField(
mediafile.MP4StorageStyle('atID', as_type=int),
mediafile.StorageStyle(u'ITUNES_ARTIST_ID', as_type=int)
))
self.add_media_field('itunes_advisory', mediafile.MediaField(
mediafile.MP4StorageStyle('rtng', as_type=int),
mediafile.StorageStyle('ITUNES_ADVISORY', as_type=int)
))
self.add_media_field('itunes_media_type', mediafile.MediaField(
mediafile.MP4StorageStyle('stik', as_type=int),
mediafile.StorageStyle('ITUNES_MEDIA_TYPE', as_type=int)
))
self.config.add({
'search_countries': ['US'],
'lookup_countries': ['US', 'DE', 'AU', 'GB', 'FR', 'IT', 'JP'],
'search_limit': 5,
'sanitize_searches': True, # Whether to remove parenthesized and braced text from searches
'artwork': False, # Whether to also download artwork from iTunes. Does not work well with fetchart
'method': 'lookup', # Valid values: 'search', 'lookup', 'both', 'lookup (search fallback)'
})
if self.method in ['lookup', 'lookup (search fallback)', 'both']:
self.register_listener('import_task_start', self.scan_track_ids)
self.register_listener('before_choose_candidate', self.add_prompt_choices)
self.register_listener('import_task_apply', self.apply_itunes_metadata)
if self.use_itunes_artwork:
self.import_stages += [ self.fetch_artwork ]
self.register_listener('import_task_files', self.assign_artwork)
@property
def method(self):
return self.config['method'].as_str()
@property
def use_itunes_artwork(self):
return self.config['artwork']
@property
def search_countries(self):
countries = self.config['search_countries'].as_str_seq()
return sanitize_choices(countries, AVAILABLE_COUNTRIES)
@property
def lookup_countries(self):
countries = self.config['lookup_countries'].as_str_seq()
return sanitize_choices(countries, AVAILABLE_COUNTRIES)
@property
def search_limit(self):
return self.config['search_limit'].as_number()
@property
def sanitize_searches(self):
return self.config['sanitize_searches']
def scan_track_ids(self, session, task):
self._log.debug("Scanning track IDs")
for item in task.items:
with open(item.path, 'rb') as file:
data = file.read(1000) # Song ID always seems to be around 600-700 bytes in
location = data.find(b'song')
if location < 0:
continue
file.seek(location + 4)
song_id = struct.unpack('>i', file.read(4))
_track_ids[item] = song_id[0]
self._log.debug("Found track ID {0} for {1}", song_id[0], item)
def candidates(self, items, artist, album, va_likely):
"""Returns a list of AlbumInfo objects for iTunes search results
matching an album and artist.
"""
try:
candidates = []
# Lookup the track ids assigned by iTunes Match
if self.method in ['lookup', 'lookup (search fallback)', 'both']:
candidates += self.lookup_albums(items)
# If the lookup was not successful ignore the ID for distance evaluation
if self.method == 'lookup (search fallback)' and len(candidates) == 0:
for item in items:
_track_ids.pop(item, None)
if self.method in ['search', 'both'] or (self.method == 'lookup (search fallback)' and len(candidates) == 0):
candidates += self.search_albums(artist, album, va_likely)
return candidates
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return []
def lookup_albums(self, items):
ids = [ str(_track_ids[item]) for item in items if item in _track_ids ]
def remove_found_tracks(album):
for track in album.get_tracks():
if track.track_id in ids:
ids.remove(track.track_id)
albums = []
for country in self.lookup_countries:
self._log.debug("Searching iTunes Store: {0}", country)
try:
results = itunespy.lookup_album(','.join(ids), country=country)
album_results = [ album for album in results if isinstance(album, itunespy.music_album.MusicAlbum) ]
albums += [ self.make_album_info(album, True) for album in album_results ]
# Remove IDs that have been found. If an album has been found for all tracks stop searching.
for album in album_results:
remove_found_tracks(album)
if not ids:
break
except LookupError:
pass
return albums
def search_albums(self, artist, album, va_likely):
"""Searches for matching albums on iTunes.
"""
if self.sanitize_searches:
pattern = re.compile('(\s*(\(.*\)|\[.*\]|\{.*\}))*$')
album = pattern.sub('', album)
artist = pattern.sub('', artist)
if va_likely:
query = album
else:
query = '%s %s' % (artist, album)
albums = []
for country in self.search_countries:
try:
results = itunespy.search_album(query, country=country, limit=self.search_limit)
albums += [self.make_album_info(result) for result in results]
except LookupError:
self._log.debug(u'Search for {0} did return no results', query)
return []
return albums
def album_for_id(self, album_id):
try:
for country in self.lookup_countries:
try:
album = itunespy.lookup_album(album_id, country=country)[0]
return self.make_album_info(album, True)
except LookupError:
pass
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return None
self._log.debug(u'The lookup for an album with id {0} returned zero results.', album_id)
return None
def track_for_id(self, track_id):
try:
for country in self.lookup_countries:
try:
track = itunespy.lookup_track(track_id, country=country)[0]
return self.make_track_info(track)
except LookupError:
pass
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return None
self._log.debug(u'The lookup for a track with id {0} returned zero results.', track_id)
return None
def track_distance(self, item, info):
dist = Distance()
if item in _track_ids:
dist.add_equality('track_id', _track_ids[item], getattr(info, 'itunes_content_id', None))
return dist
def apply_itunes_metadata(self, session, task):
for item in task.imported_items():
info = task.match.mapping[item]
for tag in ['genre', 'title_sort', 'album_sprt', 'itunes_content_id', 'itunes_artist_id', 'itunes_advisory']:
item[tag] = getattr(info, tag, None)
if task.is_album:
album_info = task.match.info
for tag in ['copyright']:
item[tag] = getattr(album_info, tag, None)
# Fix Release Date
item.release_date = "%04i-%02i-%02i" % (album_info.year, album_info.month, album_info.day)
item.itunes_media_type = 1 # Set media type to "Music"
def fetch_artwork(self, session, task):
try:
artwork_url = task.match.info.itunes_artwork_url
self._log.debug(u"Fetching artwork from iTunes...")
filename, extension = os.path.splitext(artwork_url)
with urllib.request.urlopen(artwork_url) as response, NamedTemporaryFile(suffix=extension, delete=False) as tempfile:
shutil.copyfileobj(response, tempfile)
task.itunes_artwork_file = tempfile.name
self._log.debug(u"iTunes artwork downloaded to {0}", task.artwork_file)
except AttributeError:
pass
def assign_artwork(self, session, task):
try:
artwork_file = task.itunes_artwork_file
task.album.set_art(artwork_file)
task.album.art_source = "iTunes"
task.album.store()
os.remove(artwork_file)
except AttributeError:
pass
def make_album_info(self, album, matched=False):
tracks = [ self.make_track_info(track, index) for (index, track) in enumerate(album.get_tracks()) ]
year = int(album.release_date[0:4])
month = int(album.release_date[5:7])
day = int(album.release_date[8:10])
info = AlbumInfo(album.collection_name,
album.collection_id,
album.artist_name,
album.artist_id,
tracks,
asin = None,
albumtype = None,
va = album.artist_id == VA_ARTIST_ID,
year = year,
month = month,
day = day,
label = "iTunes Match" if matched else "iTunes Store",
mediums = album.get_tracks()[0].disc_count,
artist_sort = sort_string(album.artist_name),
releasegroup_id = None,
catalognum = None,
script = "Unicode",
language = None,
country = album.country,
albumstatus = None,
media = None,
albumdisambig = None,
artist_credit = None,
original_year = year, # In order to eliminate the unneccessary year penalty
original_month = month, # See original_year
original_day = day, # See original_year
data_source = "iTunes",
data_url = album.collection_view_url)
info.album_sort = sort_string(album.collection_name)
info.itunes_content_id = album.collection_id
info.itunes_artist_id = album.artist_id
info.genre = album.primary_genre_name
info.itunes_advisory = decode_explicitness(album.collection_explicitness)
info.itunes_artwork_url = decode_artwork_url(album.artwork_url_100)
info.copyright = album.copyright if hasattr(album, 'copyright') else None
return info
def make_track_info(self, track, index):
info = TrackInfo(track.track_name,
track.track_id,
artist = track.artist_name,
artist_id = track.artist_id,
length = track.track_time / 1000 if hasattr(track, 'track_time') else None,
index = index,
medium = track.disc_number,
medium_index = track.track_number,
medium_total = track.track_count,
artist_sort = sort_string(track.artist_name),
disctitle = None,
artist_credit = None,
data_source = "iTunes",
data_url = track.track_view_url,
media = None,
lyricist = None,
composer = None,
composer_sort = None,
arranger = None,
track_alt = None)
info.title_sort = sort_string(track.track_name)
info.album_sort = sort_string(track.collection_name)
info.itunes_content_id = track.track_id
info.itunes_artist_id = track.artist_id
info.genre = track.primary_genre_name
info.itunes_advisory = decode_explicitness(track.track_explicitness)
info.itunes_artwork_url = decode_artwork_url(track.artwork_url_100)
return info
def add_prompt_choices(self, session, task):
no_itunes_matches = len(list(filter(lambda match: match.info.data_source == 'iTunes', task.candidates)))
if no_itunes_matches == 1 and task.candidates[0].info.data_source == 'iTunes':
print_(colorize('text_highlight', 'This is the only iTunes Match'))
def decode_explicitness(advisory_string):
return ['notExplicit', 'explicit', 'cleaned'].index(advisory_string)
def decode_artwork_url(url):
return url.replace("100x100", "1500x1500")
def sort_string(value):
return re.sub('^(A |An |The |Der |Die |Das |Ein |Eine )', '', value, flags=re.IGNORECASE)