Añadiendo autenticación a nuestro servicio REST con Flask

Holas, después de un tiempo de inactividad en el blog les vengo a comentar como hacer que nuestro servicio REST para crear ToDo's tenga autenticación, para ello vamos a usar JWT para gestionar la sesión.

Para poder trabajar con la autenticación vamos a usar la librería Flask-JWT-Extended la cual podemos instalar con pip install Flask-JWT-Extended o añadirla a nuestro archivo de requirements.txt el cual nos quedará de la siguiente manera:

Flask
Flask-Cors
Flask-SQLAlchemy
Flask-Migrate
Flask-JWT-Extended

Y luego instalar todo lo que tenga requirements.txt en nuestro entorno virtual

Como siguiente paso tenemos que añadir JWT al archivo app/init.py el cual nos quedará de la siguiente forma:

from flask import Flask
from flask_cors import CORS
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager

from config import Config

app = Flask(__name__)
app.config.from_object(Config)
CORS(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)

# JWT Config
app.config["JWT_SECRET_KEY"] = "my-top-secret-key" # Cambiar por la clave super hiper mega secreta que ustedes deseen
app.config['JWT_BLACKLIST_ENABLED'] = True
app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = ['access', 'refresh']

jwt = JWTManager(app)

# Token blacklist checker
@jwt.token_in_blacklist_loader
def check_if_token_in_blacklist(decrypted_token):
    jti = decrypted_token['jti']
    return jti in blacklist

from app.views import todo, user_views # Añadimos "user_views" para poder usar los endpoints de usuario {register/login}
from app.views.user_views import blacklist

Bien, ahora tenemos que añadir una tabla de usuarios a nuestra de base de datos, para ello vamos a nuestro archivos models.py y agregamos el siguiente pedazo de código al inicio:

# Esta clase contiene el modelo para el usuario en nuestra base de datos
class User(db.Model):
    # Id es un entero y es primary key
    id = db.Column(db.Integer, primary_key=True, default=lambda: uuid4().hex)
    # El usuario es un texto y también es primary key
    username = db.Column(db.Text, primary_key=True)
    # La constraseña es un texto y no se permite valores nulos
    password = db.Column(db.Text, nullable=False)

    # Esta función nos permite retornar los datos del usuario como un diccionario para poder convertirlos en JSON
    def json_dump(self):
        return dict(
            id=self.id,
            username=self.username
        )

    # Esta función tiene propósitos de depuración
    def __repr__(self):
        return '<User Name%r>' % self.username

Debido a que ahora tenemos usuarios en la base de datos, vamos a cambiar el modelo de los ToDo's para que cada ToDo tenga un dueño y así trabajar con los ToDo's que pertenecen a un usuario, por lo tanto nuestra clase de ToDo quedará de la siguiente manera:

class ToDo(db.Model):
    # Id es un entero y es primary key
    id = db.Column(db.Integer, primary_key=True)
    # UserID es un entero y es un foreign key hacia la tabla de usuarios
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    # Titulo es un String de 100 caracteres, indexado y no se permite valores nulos
    title = db.Column(db.String(100), index=True, nullable=False)
    # Descripcion es un String de 400 caracteres, indexado y no se permite valores nulos
    description = db.Column(db.String(400), index=True, nullable=True)

    # Esta función nos permite retornar los datos del usuario como un diccionario para poder convertirlos en JSON
    def json_dump(self):
        return dict(
            id=self.id,
            user_id=self.user_id,
            title=self.title,
            description=self.description
        )

    # Esta función tiene propósitos de depuración
    def __repr__(self):
        return '<ToDo Title %r>' % self.title

Hasta ahora tenemos los modelos para Usuario y ToDo y también tenemos añadido JWT para su posterior uso, lo cual nos lleva a los endpoints de nuestra aplicación, dicho esto vamos a crear los respectivos endpoints para poder registrar a nuestros usuarios, iniciar sesión, salir de sesión, etc.

from flask import abort, jsonify
from flask import request
from sqlalchemy.exc import IntegrityError
from app import app, db
from app.models import User
from ..utils import json_utils, password_utils
from flask_jwt_extended import create_access_token, jwt_refresh_token_required, \
    create_refresh_token, get_jwt_identity, get_raw_jwt, jwt_required

blacklist = set()

@app.route("/register", methods=['POST'])
def register():
    # Revisamos si el request en un JSON valido
    json_utils.is_not_json_request(request)
    # Extraemos el usuario del request
    username = request.json.get('username')
    # Buscamos si tenemos un usuario registrado con ese nombre
    existent_user = User.query.filter_by(username=username).first()
    # Si el usuario existe retornamos un error con código 409 de conflicto
    if existent_user:
        return jsonify(message="User already exists"), 409
    # Caso contrario procedemos con la creación del usuario
    else:
        # Extraemos el password del request
        password = request.json.get('password')
        # Hacemos un hash del password, para guardarlo de manera segura
        hashed_password = password_utils.get_hashed_password_with_sha512(password)
        # Creamos una nueva instancia de la clase usuario
        new_user = User(username=username, password=hashed_password)
        # Añadimos nuestro usuario a la session de la base de datos
        db.session.add(new_user)
        try:
            # Hacemos commit -> esto ejecutará la operación "INSERT" en la base de datos
            db.session.commit()
        except IntegrityError:
            # Si algo sale mal retornamos codigo de error 400
            abort(400)
        return jsonify(new_user.json_dump()), 201

@app.route("/login", methods=['POST'])
def login():
    # Revisamos si el request en un JSON valido
    json_utils.is_not_json_request(request)
    # Extraemos los datos del request
    username = request.json.get('username')
    password = request.json.get('password')
    # Obtenemos el hash del password
    hashed_password = password_utils.get_hashed_password_with_sha512(password)
    # Buscamos si existe dicho usuario y password en la base de datos
    logged_user = User.query.filter_by(username=username, password=hashed_password).first()
    # Si el usuario existe creamos una identidad para el mismo
    if logged_user:
        import datetime
        # Usamos  el id, el username y la fecha actual para crear la identidad
        user_identity = {
            "user_id": logged_user.id,
            "username": username,
            "date": datetime.datetime.now()
        }
        # Creamos un tiempo de expiración para el token
        expired_time = datetime.timedelta(minutes=60)
        # Tokens de acceso y refresco, para continuar en la sesión
        access_token = create_access_token(identity=user_identity, expires_delta=expired_time)
        refresh_token = create_refresh_token(identity=user_identity)
        # Retornamos 200 con los tokens del usuario
        return jsonify(access_token=access_token, refresh_token=refresh_token, message="Logged successfully"), 200
    else:
        # Si el usuario no existe retornamos 401 de no autorizado
        return jsonify(message="Bad username or password"), 401

# Enpoint para hacer un refrescar del token y seguir en la sesión
@app.route('/refresh', methods=['POST'])
@jwt_refresh_token_required
def refresh():
    import datetime
    # Obtenemos la indentidad del usuario
    current_user = get_jwt_identity()
    # Añadimos 60 minutos más
    expired_time = datetime.timedelta(minutes=60)
    # Creamos el nuevo token
    new_access_token = create_access_token(identity=current_user, expires_delta=expired_time)
    # Retornamos el nuevo token
    return jsonify(access_token=new_access_token, message="Token refresh successfully")

# Endpoint para salir de la sesión
@app.route('/logout', methods=['DELETE'])
@jwt_required
def logout():
    jti = get_raw_jwt()['jti']
    blacklist.add(jti)
    return jsonify({"msg": "Successfully logged out"}), 200

# Endpoint para revocar el refresco del token
@app.route('/revoke', methods=['DELETE'])
@jwt_refresh_token_required
def revoke_user_token():
    jti = get_raw_jwt()['jti']
    blacklist.add(jti)
    return jsonify({"msg": "Successfully revoked"}), 200

Los enpoints para refrescar, salir de sesión y revocar el token de refresco necesitan JWT, puesto que para ello debemos estar con nuestra sesión iniciada

Ahora viene la parte interesante en nuestro proyecto, hacer que los endpoints de ToDo tengan acceso solamente cuando el usuario ha iniciado sesión, para ello vamos a crear los endpoints de la siguiente manera:

import json

from flask import abort, jsonify
from flask import request
from sqlalchemy.exc import IntegrityError

from app import app, db
from app.models import ToDo
from ..utils import json_utils
from flask_jwt_extended import jwt_required, get_jwt_identity

@app.route("/", methods=['GET'])
@jwt_required
def get_all_todos():
    # Cogemos el usuario logueado con el token que se provee en el enpdpoint
    current_user_id = get_jwt_identity()['user_id']
    # Buscamos todos los todo's que tiene ese usuario
    all_todos = ToDo.query.filter_by(user_id=current_user_id)
    # Retornamos los todo's como una lista
    return jsonify([each_todo.json_dump() for each_todo in all_todos])

@app.route("/" + '<string:todo_id>', methods=['GET'])
@jwt_required
def get_todo(todo_id):
    # Cogemos el usuario logueado con el token que se provee en el enpdpoint
    current_user_id = get_jwt_identity()['user_id']
    # Buscamos un todo por su id
    selected_todo = ToDo.query.filter_by(id=todo_id, user_id=current_user_id)
    # Si no existe retornamos 404 de no encontrado
    if selected_todo is None:
        return jsonify(message="Item not found"), 404
    return jsonify(selected_todo.json_dump())

@app.route("/", methods=['POST'])
@jwt_required
def post_todo():
    # Revisamos si el request es un JSON válido
    json_utils.is_not_json_request(request)
    try:
        # Cogemos el usuario logueado con el token que se provee en el enpdpoint
        current_user_id = get_jwt_identity()['user_id']
        # Extraemos los datos del request
        title = request.json.get('title')
        description = request.json.get('description')
        # Creamos una nueva instancia de ToDo
        new_todo = ToDo(user_id=current_user_id, title=title, description=description)
        # Añadimos el ToDo a la sesión de la base de datos
        db.session.add(new_todo)
        # Hacemos commit de la sesión -> Esto ejecutará la operación "INSERT"en la base de datos
        db.session.commit()
    except IntegrityError:
        # Si algo sale mal retornamos error 400
        abort(400)
    return jsonify(request.json), 201

@app.route("/" + '<string:todo_id>', methods=['PUT'])
@jwt_required
def put_todo(todo_id):
    # Cogemos el usuario logueado con el token que se provee en el enpdpoint
    current_user_id = get_jwt_identity()['user_id']
    # Revisamos si el request en un JSON válido
    json_utils.is_not_json_request(request)
    # Buscamos un ToDo por su id
    selected_todo = ToDo.query.filter_by(id=todo_id, user_id=current_user_id).first()
    # Si no existe retornamos 404 de no encontrado
    if selected_todo is None:
        return jsonify(message="Item not found"), 404
    # Extraemos los datos del request  los asignamos a nuestro ToDo
    selected_todo.title = request.json.get('title')
    selected_todo.description = request.json.get('description')
    try:
        # Hacemos commit de la sesion de base de datos
        db.session.commit()
    except IntegrityError:
        # Si algo sale mal retornamos error 400
        abort(400)
    return json.dumps(request.json)

@app.route("/" + '<string:todo_id>', methods=['DELETE'])
@jwt_required
def delete_todo(todo_id):
    # Cogemos el usuario logueado con el token que se provee en el enpdpoint
    current_user_id = get_jwt_identity()['user_id']
    # Buscamos un ToDo por su id
    selected_todo = ToDo.query.filter_by(id=todo_id, user_id=current_user_id)
    # Si el ToDo no existe retornamos 404 de no encontrado
    if selected_todo is None:
        return jsonify(message="Item not found"), 404
    # Añadimos a la sesión de base de datos el ToDo a borrar
    db.session.delete(selected_todo)
    try:
        # Hacemos commit de la sesión -> Esto ejecutará la operación "DELETE" en la base de datos
        db.session.commit()
    except IntegrityError:
        # Si algo sale mal retornamos error 400
        abort(400)
    return ""

El uso de los endpoints es similar al que hemos venido usando en https://blog.nano-bytes.com/2019/11/02/servicio-rest-con-flask-sqlalchemy-y-sqlite/ pero ahora tenemos unos endpoints extra para poder registrar un usario y para poder hacer login.

Registrar -> POST

URL: http://127.0.0.1:8085/register

Cuerpo del registro

{
    "username": "John",
    "password": "1234abcd"
}

Respuesta:

{
  "id": "8134812384838384823",
  "username": "John"
}

Login -> POST

URL: http://127.0.0.1:8085/login

Cuerpo del login

{
    "username": "John",
    "password": "1234abcd"
}

Respuesta:

{
  "access_token": "2398radnfa9djknasdf89q23bjdfowhfasdja8sdfkasdf9ajfbfa8sdhfasdfp9a8sdhfjasb98h89fajksdnfui1efbjIHASDF98H.SdjlIUDFASDF",
  "message": "Logged successfully",
  "refresh_token": "23498dsfnasdhamsnvapisdunvajkndfvuiandfgnasdivajknvpuaner.ADFJUAEFJ9nuansdfkjasnd.aadf"
}

El login nos devolverá un un access_token y un refresh_token por lo tanto tomamos el access_token para utilizarlo como Bearer Token en nuestro aplicativo de REST endpoints como Insomnia o Postman

El token contiene la información del usuario, por lo que los ToDos serán filtrados por dicho usuario, sin necesidad de proveer el id del usuario en la URL del endpoint

Con CURL pueden hacer algo como esto:

curl --request GET \
  --url http://127.0.0.1:8085/ \
  --header 'authorization: Bearer 2398radnfa9djknasdf89q23bjdfowhfasdja8sdfkasdf9ajfbfa8sdhfasdfp9a8sdhfjasb98h89fajksdnfui1efbjIHASDF98H.SdjlIUDFASDF'

Ese es el ejemplo de cómo obtener todos los ToDos de un usuario

La URL donde se encuentra este ejemplo es: https://github.com/nano-bytes/flask/tree/master/simple-todo-with-auth

Eso es todo por hoy, nos vemos en un próxima entrega

Happy Hacking!!

You may also like...