1

Add Plugins sources and itunes

This commit is contained in:
Kim Wittenburg
2018-09-03 17:57:48 +02:00
parent 7c78f896a8
commit 6d9ed81182
3 changed files with 366 additions and 0 deletions

2
beetsplug/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

325
beetsplug/itunes.py Normal file
View File

@@ -0,0 +1,325 @@
import os
import re
import struct
import shutil
import itunespy
import urllib.request
from tempfile import NamedTemporaryFile
from beets import config
from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance
from beets.plugins import BeetsPlugin, sanitize_choices
import beets.mediafile as mediafile
AVAILABLE_COUNTRIES = ['DE', 'US', 'GB', 'FR', 'AU', 'CA', 'IT', 'JP', 'DZ', 'AO', 'AI', 'AG', 'AR', 'AM', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', 'BY', 'BE', 'BZ', 'BM', 'BO', 'BW', 'BR', 'BN', 'BG', 'CM', 'KY', 'CL', 'CN', 'CO', 'CR', 'CI', 'HR', 'CY', 'CZ', 'DK', 'DM', 'DO', 'EC', 'EG', 'SV', 'EE', 'ET', 'FI', 'GH', 'GR', 'GD', 'GT', 'GY', 'HN', 'HK', 'HU', 'IS', 'IN', 'ID', 'IE', 'IL', 'JM', 'JO', 'KZ', 'KE', 'KR', 'KW', 'LV', 'LB', 'LY', 'LI', 'LT', 'LU', 'MO', 'MK', 'MG', 'MY', 'MV', 'ML', 'MT', 'MU', 'MX', 'MD', 'MS', 'MM', 'NP', 'NL', 'NZ', 'NI', 'NE', 'NG', 'NO', 'OM', 'PK', 'PA', 'PY', 'PE', 'PH', 'PL', 'PT', 'QA', 'RO', 'RU', 'KN', 'LC', 'VC', 'SA', 'SN', 'RS', 'SG', 'SK', 'SI', 'ZA', 'ES', 'LK', 'SR', 'SE', 'CH', 'TW', 'TZ', 'TH', 'TT', 'TN', 'TR', 'TC', 'UG', 'UA', 'AE', 'UY', 'UZ', 'VE', 'VN', 'VG', 'YE']
VA_ARTIST_ID = 120644327
_track_ids = {}
class SearchAPIPlugin(BeetsPlugin):
"""A beets plugin that adds the iTunes API as a metadata source.
The plugin can also lookup song metadata of songs the have been
matched by iTunes Match.
"""
def __init__(self):
super(SearchAPIPlugin, self).__init__()
self.add_media_field('title_sort', mediafile.MediaField(
mediafile.MP3StorageStyle('TSOT'),
mediafile.MP4StorageStyle('sonm'),
mediafile.StorageStyle('TITLESORT'),
mediafile.ASFStorageStyle('WM/TitleSortOrder')
))
self.add_media_field('album_sort', mediafile.MediaField(
mediafile.MP3StorageStyle('TSOA'),
mediafile.MP4StorageStyle('soal'),
mediafile.StorageStyle('ALBUMSORT'),
mediafile.ASFStorageStyle('WM/AlbumSortOrder')
))
self.add_media_field('itunes_content_id', mediafile.MediaField(
mediafile.MP4StorageStyle('cnID', as_type=int),
mediafile.StorageStyle(u'ITUNES_CONTENT_ID', as_type=int)
))
self.add_media_field('itunes_artist_id', mediafile.MediaField(
mediafile.MP4StorageStyle('atID', as_type=int),
mediafile.StorageStyle(u'ITUNES_ARTIST_ID', as_type=int)
))
self.add_media_field('itunes_genre_id', mediafile.MediaField(
mediafile.MP4StorageStyle('geID', as_type=int),
mediafile.StorageStyle('ITUNES_GENRE_ID', as_type=int)
))
self.add_media_field('itunes_advisory', mediafile.MediaField(
mediafile.MP4StorageStyle('rtng', as_type=int),
mediafile.StorageStyle('ITUNES_ADVISORY', as_type=int)
))
self.add_media_field('itunes_media_type', mediafile.MediaField(
mediafile.MP4StorageStyle('stik', as_type=int),
mediafile.StorageStyle('ITUNES_MEDIA_TYPE', as_type=int)
))
self.config.add({
'search_countries': ['US'],
'lookup_countries': ['US', 'DE', 'AU', 'GB', 'FR', 'IT', 'JP'],
'search_limit': 5,
'artwork': False, # Whether to also download artwork from iTunes. Does not work well with fetchart
'method': 'lookup', # Valid values: 'search', 'lookup', 'both', 'lookup (search fallback)'
})
if self.method in ['lookup', 'lookup (search fallback)', 'both']:
self.register_listener('import_task_start', self.scan_track_ids)
self.register_listener('import_task_apply', self.apply_itunes_metadata)
if self.use_itunes_artwork:
self.import_stages += [ self.fetch_artwork ]
self.register_listener('import_task_files', self.assign_artwork)
@property
def method(self):
return self.config['method'].as_str()
@property
def use_itunes_artwork(self):
return self.config['artwork']
@property
def search_countries(self):
countries = self.config['search_countries'].as_str_seq()
return sanitize_choices(countries, AVAILABLE_COUNTRIES)
@property
def lookup_countries(self):
countries = self.config['lookup_countries'].as_str_seq()
return sanitize_choices(countries, AVAILABLE_COUNTRIES)
@property
def search_limit(self):
return self.config['search_limit'].as_number()
def scan_track_ids(self, session, task):
self._log.debug("Scanning track IDs")
items = task.items if task.is_album else [ task.item ]
for item in items:
with open(item.path, 'rb') as file:
data = file.read(1000) # Song ID always seems to be around 600-700 bytes in
location = data.find(b'song')
if location < 0:
continue
file.seek(location + 4)
song_id = struct.unpack('>i', file.read(4))
_track_ids[item] = song_id[0]
self._log.debug("Found track ID {0} for {1}", song_id[0], item)
def candidates(self, items, artist, album, va_likely):
"""Returns a list of AlbumInfo objects for iTunes search results
matching an album and artist.
"""
candidates = []
# Lookup the track ids assigned by iTunes Match
if self.method in ['lookup', 'lookup (search fallback)', 'both']:
candidates += self.lookup_albums(items)
# If the lookup was not successful ignore the ID for distance evaluation
if self.method == 'lookup (search fallback)' and len(candidates) == 0:
for item in items:
_track_ids.pop(item, None)
if self.method in ['search', 'both'] or (self.method == 'lookup (search fallback)' and len(candidates) == 0):
candidates += self.search_albums(artist, album, va_likely)
return candidates
def lookup_albums(self, items):
ids = [ str(_track_ids[item]) for item in items if item in _track_ids ]
def remove_found_tracks(album):
for track in album.get_tracks():
try:
ids.remove(track.track_id)
except ValueError:
pass
albums = []
try:
for country in self.lookup_countries:
self._log.debug("Searching iTunes Store: {0}", country)
try:
results = itunespy.lookup_album(','.join(ids), country=country)
album_results = [ album for album in results if isinstance(album, itunespy.music_album.MusicAlbum) ]
albums += [ self.make_album_info(album, True) for album in album_results ]
# Remove IDs that have been found. If an album has been found for all tracks stop searching.
for album in album_results:
remove_found_tracks(album)
if not ids:
break
except LookupError:
pass
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return []
return albums
def search_albums(self, artist, album, va_likely):
"""Searches for matching albums on iTunes.
"""
if va_likely:
query = album
else:
query = '%s %s' % (artist, album)
albums = []
for country in self.search_countries:
try:
results = itunespy.search_album(query, country=country, limit=self.search_limit)
albums += [self.make_album_info(result) for result in results]
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return []
except LookupError:
self._log.debug(u'Search for {0} did return no results', query)
return []
return albums
def album_for_id(self, album_id):
try:
for country in self.lookup_countries:
try:
album = itunespy.lookup_album(album_id, country=country)
return self.make_album_info(album, True)
except LookupError:
pass
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return None
self._log.debug(u'The lookup for an album with id {0} returned zero results.', album_id)
return None
def track_for_id(self, track_id):
try:
for country in self.lookup_countries:
try:
track = itunespy.lookup_track(track_id, country=country)
return self.make_track_info(track)
except LookupError:
pass
except ConnectionError:
self._log.debug(u'Cannot search iTunes (no network connection)')
return None
self._log.debug(u'The lookup for a track with id {0} returned zero results.', track_id)
return None
def track_distance(self, item, info):
dist = Distance()
if item in _track_ids:
dist.add_equality('track_id', _track_ids[item], getattr(info, 'itunes_content_id', None))
return dist
def apply_itunes_metadata(self, session, task):
for item in task.imported_items():
info = task.match.mapping[item]
try:
item.genre = info.genre
if not item.title == info.title_sort:
item.title_sort = info.title_sort
if not item.album == info.album_sort:
item.album_sort = info.album_sort
item.itunes_content_id = info.itunes_content_id
item.itunes_artist_id = info.itunes_artist_id
item.itunes_advisory = info.itunes_advisory
item.itunes_media_type = 1 # Set media type to "Music"
except AttributeError:
pass
def fetch_artwork(self, session, task):
try:
artwork_url = task.match.info.itunes_artwork_url
self._log.debug(u"Fetching artwork from iTunes...")
filename, extension = os.path.splitext(artwork_url)
with urllib.request.urlopen(artwork_url) as response, NamedTemporaryFile(suffix=extension, delete=False) as tempfile:
shutil.copyfileobj(response, tempfile)
task.itunes_artwork_file = tempfile.name
self._log.debug(u"iTunes artwork downloaded to {0}", task.artwork_file)
except AttributeError:
pass
def assign_artwork(self, session, task):
try:
artwork_file = task.itunes_artwork_file
task.album.set_art(artwork_file)
task.album.art_source = "iTunes"
task.album.store()
os.remove(artwork_file)
except AttributeError:
pass
def make_album_info(self, album, matched=False):
tracks = [ self.make_track_info(track, index) for (index, track) in enumerate(album.get_tracks()) ]
info = AlbumInfo(album.collection_name,
album.collection_id,
album.artist_name,
album.artist_id,
tracks,
asin = None,
albumtype = None,
va = album.artist_id == VA_ARTIST_ID,
year = album.release_date[0:4],
month = album.release_date[5:7],
day = album.release_date[8:10],
label = "iTunes Match" if matched else "iTunes Store",
mediums = album.get_tracks()[0].disc_count,
artist_sort = sort_string(album.artist_name),
releasegroup_id = None,
catalognum = None,
script = "Unicode",
language = None,
country = album.country,
albumstatus = None,
media = None,
albumdisambig = None,
artist_credit = None, # Or Artist?
original_year = None,
original_month = None,
original_day = None,
data_source = "iTunes",
data_url = album.collection_view_url)
info.album_sort = sort_string(album.collection_name)
info.itunes_content_id = album.collection_id
info.itunes_artist_id = album.artist_id
info.genre = album.primary_genre_name
info.itunes_advisory = decode_explicitness(album.collection_explicitness)
info.itunes_artwork_url = decode_artwork_url(album.artwork_url_100)
return info
def make_track_info(self, track, index):
info = TrackInfo(track.track_name,
track.track_id,
artist = track.artist_name,
artist_id = track.artist_id,
length = track.track_time / 1000,
index = index,
medium = track.disc_number,
medium_index = track.track_number,
medium_total = track.track_count,
artist_sort = sort_string(track.artist_name),
disctitle = None,
artist_credit = None,
data_source = "iTunes",
data_url = track.track_view_url,
media = None,
lyricist = None,
composer = None,
composer_sort = None,
arranger = None,
track_alt = None)
info.title_sort = sort_string(track.track_name)
info.album_sort = sort_string(track.collection_name)
info.itunes_content_id = track.track_id
info.itunes_artist_id = track.artist_id
info.genre = track.primary_genre_name
info.itunes_advisory = decode_explicitness(track.track_explicitness)
info.itunes_artwork_url = decode_artwork_url(track.artwork_url_100)
return info
def decode_explicitness(advisory_string):
return ['notExplicit', 'explicit', 'cleaned'].index(advisory_string)
def decode_artwork_url(url):
return url.replace("100x100", "1500x1500")
def sort_string(value):
return re.sub('^(A |An |The |Der |Die |Das |Ein |Eine )', '', value, flags=re.IGNORECASE)

39
beetsplug/sources.py Normal file
View File

@@ -0,0 +1,39 @@
import re
from beets import config
from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance
from beets.plugins import BeetsPlugin
class SearchAPIPlugin(BeetsPlugin):
"""A beets plugin that lets you set preferences for different metadata sources.
"""
def __init__(self):
super(SearchAPIPlugin, self).__init__()
self.config.add({
'auto': True, # Enables or disables the plugin
'preferred': [ '*' ],
})
@property
def preferred_sources(self):
"""Reads the user's preferred sources, parses
them as regular expressions and returns a list
of the resulting Patterns.
"""
preferences = self.config['preferred'].get()
return [re.compile('.*') if item == '*' else re.compile(item) for item in preferences]
def album_distance(self, items, album_info, mapping):
"""Returns the album distance.
"""
dist = Distance()
dist.add_priority('source', album_info.data_source, self.preferred_sources)
return dist
def track_distance(self, item, info):
"""Returns the track distance.
"""
dist = Distance()
dist.add_priority('source', info.data_source, self.preferred_sources)
return dist