Source code for invenio_oauth2server.provider

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Configuration of Flask-OAuthlib provider."""

from datetime import datetime, timedelta

from flask import current_app
from flask_login import current_user
from flask_oauthlib.provider import OAuth2Provider
from flask_security.utils import verify_password
from invenio_db import db
from werkzeug.local import LocalProxy

from .models import Client, Token
from .scopes import email_scope

oauth2 = OAuth2Provider()
datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)


[docs]@oauth2.usergetter def get_user(email, password, *args, **kwargs): """Get user for grant type password. Needed for grant type 'password'. Note, grant type password is by default disabled. :param email: User email. :param password: Password. :returns: The user instance or ``None``. """ user = datastore.find_user(email=email) if user and user.active and verify_password(password, user.password): return user
[docs]@oauth2.tokengetter def get_token(access_token=None, refresh_token=None): """Load an access token. Add support for personal access tokens compared to flask-oauthlib. If the access token is ``None``, it looks for the refresh token. :param access_token: The access token. (Default: ``None``) :param refresh_token: The refresh token. (Default: ``None``) :returns: The token instance or ``None``. """ if access_token: t = Token.query.filter_by(access_token=access_token).first() if t and t.is_personal and t.user.active: t.expires = datetime.utcnow() + timedelta( seconds=int(current_app.config.get( 'OAUTH2_PROVIDER_TOKEN_EXPIRES_IN' )) ) elif refresh_token: t = Token.query.join(Token.client).filter( Token.refresh_token == refresh_token, Token.is_personal == False, # noqa Client.is_confidential == True, ).first() else: return None return t if t and t.user.active else None
[docs]@oauth2.clientgetter def get_client(client_id): """Load the client. Needed for grant_type client_credentials. Add support for OAuth client_credentials access type, with user inactivation support. :param client_id: The client ID. :returns: The client instance or ``None``. """ client = Client.query.get(client_id) if client and client.user.active: return client
[docs]@oauth2.tokensetter def save_token(token, request, *args, **kwargs): """Token persistence. :param token: A dictionary with the token data. :param request: The request instance. :returns: A :class:`invenio_oauth2server.models.Token` instance. """ # Exclude the personal access tokens which doesn't expire. user = request.user if request.user else current_user # Add user information in token endpoint response. # Currently, this is the only way to have the access to the user of the # token as well as the token response. token.update(user={'id': user.get_id()}) # Add email if scope granted. if email_scope.id in token.scopes: token['user'].update( email=user.email, email_verified=user.confirmed_at is not None, ) tokens = Token.query.filter_by( client_id=request.client.client_id, user_id=user.id, is_personal=False, ) # make sure that every client has only one token connected to a user if tokens: for tk in tokens: db.session.delete(tk) db.session.commit() expires_in = token.get('expires_in') expires = datetime.utcnow() + timedelta(seconds=int(expires_in)) tok = Token( access_token=token['access_token'], refresh_token=token.get('refresh_token'), token_type=token['token_type'], _scopes=token['scope'], expires=expires, client_id=request.client.client_id, user_id=user.id, is_personal=False, ) db.session.add(tok) db.session.commit() return tok