Source code for asyncspotify.oauth.flows

import logging
from json import JSONDecodeError, dumps, loads
from os.path import isfile
from urllib.parse import parse_qs, urlparse

from .mixins import RefreshableFlowMixin
from .response import AuthenticationResponse, AuthorizationCodeFlowResponse
from ..http import Route
from ..scope import Scope

log = logging.getLogger(__name__)

AUTHORIZE_URL = 'https://accounts.spotify.com/authorize'
TOKEN_URL = 'https://accounts.spotify.com/api/token'


class Authenticator:
	'''Base authentication class. All authenticators should inherit from this, custom or not.'''

	_client = None
	_data: AuthenticationResponse = None

	def __init__(self, client_id, client_secret=None):
		self.client_id = client_id
		self.client_secret = client_secret

	def __call__(self, client):
		self._client = client
		return self

	async def close(self):
		pass

	@property
	def header(self):
		if self._data is None:
			raise ValueError('Not authorized yet, no Authorization header to craft')
		return self._data.header

	@property
	def client(self):
		if self._client is None:
			raise ValueError('Client not set for authenticator yet.')
		return self._client

	@property
	def market(self):
		raise RuntimeError('This authenticator does not ensure a market.')

	async def authorize(self):
		raise NotImplementedError


[docs]class ClientCredentialsFlow(Authenticator, RefreshableFlowMixin): ''' Implements the Client Credentials flow. You can only access public resources using this authenticator. ''' def __init__(self, client_id, client_secret, response_class=AuthenticationResponse): super().__init__(client_id, client_secret) self.response_class = response_class @property def market(self): # TODO: ??? does this make sense to do? I have no idea. return 'US'
[docs] async def authorize(self): '''Authorize using this authenticator.''' await self.refresh(start_task=True)
async def token(self): d = dict( grant_type='client_credentials', client_id=self.client_id, client_secret=self.client_secret ) data = await self._token(d) return AuthenticationResponse(data)
[docs]class AuthorizationCodeFlow(Authenticator, RefreshableFlowMixin): ''' Implements the Authorization Code flow. .. note:: This class is not for general use, please use :class:`EasyAuthorizationCodeFlow` or subclass this and implement your own load(), store(response) and setup() methods. client_id: str Your application client id. client_secret: str Your application client secret. scope: :class:`Scope` The scope you're requesting. redirect_uri: str Where the user will be redirected to after accepting the client. response_class: The type that is expected to be returned from load() and setup(), and is passed to store(response) when a token refresh happens. Should be :class:`AuthorizationCodeFlowResponse` or inherit from it. ''' _data: AuthorizationCodeFlowResponse def __init__(self, client_id, client_secret, scope, redirect_uri, response_class=AuthorizationCodeFlowResponse): super().__init__(client_id, client_secret) assert isinstance(scope, Scope) self.scope = scope self.redirect_uri = redirect_uri self.response_class = response_class @property def market(self): return 'from_token' async def token(self): if self._data is None: raise ValueError('Can\'t refresh token without previous refresh token') data = dict( grant_type='refresh_token', refresh_token=self._data.refresh_token, client_id=self.client_id, client_secret=self.client_secret ) data = await self._token(data) ins = self.response_class(data) ins.refresh_token = self._data.refresh_token return ins
[docs] async def authorize(self): '''Authorize the client. Reads from the file specificed by `store`.''' data = await self.load() # no data found, run first time setup # get response class, pass it to .store if data is None: data = await self.setup() if isinstance(data, AuthenticationResponse): await self.store(data) if not isinstance(data, AuthenticationResponse): raise TypeError('setup() has to return an AuthenticationResponse') self._data = data # refresh it now if it's expired if self._data.is_expired(): await self.refresh(start_task=True) else: # manually start refresh task if we didn't refresh on startup self.refresh_in(self._data.seconds_until_expire())
async def setup(self): raise NotImplementedError async def store(self, response): raise NotImplementedError async def load(self): raise NotImplementedError
[docs] def create_authorize_route(self): '''Craft the :class:`Route` for the user to use for authorizing the client.''' params = dict( client_id=self.client_id, redirect_uri=self.redirect_uri, response_type='code' ) if self.scope is not None: params['scope'] = self.scope.string() return Route('GET', AUTHORIZE_URL, **params)
[docs] def get_code_from_redirect(self, url): '''Extract the authorization code from the redirect uri.''' parsed = urlparse(url.strip()) query = parsed.query if not query: raise ValueError('Unable to parse that redirect url') qs = parse_qs(query) if 'code' not in qs: raise ValueError('Redirect url seems to be missing code fragment') return qs['code'][0]
def create_token_data_from_code(self, code): return dict( client_id=self.client_id, client_secret=self.client_secret, grant_type='authorization_code', code=code, redirect_uri=self.redirect_uri )
[docs]class EasyAuthorizationCodeFlow(AuthorizationCodeFlow): def __init__(self, client_id, client_secret, scope=Scope.none(), storage='secret.json', response_class=AuthorizationCodeFlowResponse): super().__init__(client_id, client_secret, scope, 'http://localhost/', response_class) self.storage = storage async def setup(self): fmt = ( 'Hi! This is the initial EasyAuthorizationCode setup.\n\n' 'Please open this URL:\n{0}\n\n' 'and then input the URL you were redirected to after accepting here:\n' ).format(str(self.create_authorize_route())) code_url = input(fmt) code = self.get_code_from_redirect(code_url) d = self.create_token_data_from_code(code) data = await self._token(d) return self.response_class(data) async def load(self): if isfile(self.storage): # if storage file exists, read and deserialize it with open(self.storage, 'r') as f: try: raw_data = loads(f.read()) except JSONDecodeError: return None # return the response instance return self.response_class.from_data(raw_data) async def store(self, response): # simply store the response as a dumped json dict with open(self.storage, 'w') as f: f.write(dumps(response.to_dict(), indent=2))