Computer Science

Build An API With A Database From Scratch

Spring semester is over, Summer break has begun, and I always feel like I want to learn something new around this time of the year.

The first item on my ‘Summer Learning’ list was building an API since I never got the chance to build one before and I assumed that at some point I would do that at work or at least modify an existing API.


Briefly, what is an API?

API = Application Programming Interface.

An API basically handles user requests and returns responses, In the same way, that a waiter in a restaurant handles your order and provides you with food.

Like in a restaurant, an API has a previously defined menu and the entire connection is based on that.


Overview

To make this project more interesting, I decided to work with a database.
I browsed Kaggle, looking for something that will catch my eye, and with no surprise, we are going to work with a Pokemon dataset — Ideally, you are as excited as I was realizing there is such a dataset.

You can get the dataset from Kaggle.

Our project is going to consist of two main files — one for handling the database calls, and a second one for the API.

Requirements: FastAPI, uvicorn, pandas

By the end of this article, you will have created the following API


Database

Create your first file, name it database.py and let’s start coding.

Starting with the imports

from pathlib import Path
import sqlite3
import pandas as pd

The sqlite3 database is a file-based database, which means that first, we will have to create a file that will contain our dataset.

DB_FILENAME = "poke_db.db"

def init_db():
    if not Path(DB_FILENAME).is_file():
        Path(DB_FILENAME).touch()

Next, we will load the dataset we downloaded to the database file we created.

def load_csv_to_db():
    init_db()
    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Pokemons (idx int, name text,
        type1 text,type2 text, sum_stats int, hp int, attack int,
        special_attack int, defense int, special_defense int)''')

    poke_data = pd.read_csv('Pokemon.csv')

    poke_data.drop(['Speed', 'Generation', 'Legendary'], axis=1,
                    inplace=True)

    poke_data.columns = ['idx', 'name', 'type1', 'type2', 
                          'sum_stats', 'hp', 'attack',
                          'special_attack', 'defense',    
                          'special_defense']

    poke_data.to_sql('Pokemons', conn, if_exists='append',
                      index=False)

In short, this function creates a table called Pokemons and our dataset to our database file.
I also omitted a few columns — you can keep them and add the matching columns to your table.

Before we move on to database queries, I am gonna define a function that will be useful later on — this function is going to let us know if our table exists or not.

def table_exists(cursor):
    cursor.execute('''
        SELECT count(name) FROM sqlite_master WHERE type='table' AND
        name='Pokemons' ''')

    if not cursor.fetchone()[0]:
        return False
    return True

Each API call we will define will eventually query the database, so we are going to prepare the needed functionality for our API beforehand.

Our first API call will allow us to retrieve pokemon details by their name, so we will create the matching database query

def get_poke_by_name(poke_name):
    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()

    if not table_exists(cursor):
        load_csv_to_db()

    cursor.execute('''SELECT * FROM Pokemons WHERE name = ?''', 
                    (poke_name,))

    return cursor.fetchone()

The next query is going to retrieve a list of pokemons based on their types — each pokemon has a primary and secondary type and we will be able to query for both types or just the primary type.

def get_poke_by_type(type1, type2=None):
    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()

    if not table_exists(cursor):
        load_csv_to_db()

    if type2:
        cursor.execute('''
        SELECT * FROM Pokemons WHERE type1 = ? AND type2 = ?''', 
        (type1, type2))

    else:
        cursor.execute('''
        SELECT * FROM Pokemons WHERE type1 = ?''', (type1,))
 
    return cursor.fetchall()

The above two queries’ goal was to retrieve pokemons based on some parameters.
The next three queries will be responsible for adding, updating, and deleting pokemons.

Adding Pokemons

def add_poke_to_db(name, type1, type2, sum_stats, hp, attack, 
                   special_attack,defense, special_defense):  
  
    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()

    if not table_exists(cursor):
        load_csv_to_db()


    cursor.execute('''
        INSERT INTO Pokemons ('name', 'type1', 'type2', 'sum_stats',  
                          'hp', 'attack', 'special_attack', 
                          'defense', 'special_defense')
                           VALUES (?,?,?,?,?,?,?,?,?)''', 
                           (name, type1, type2, sum_stats, hp, 
                           attack, special_attack, defense, 
                           special_defense))
    conn.commit()

The function above adds a new row to the database with the parameters passed.
We have to commit our changes in order to make sure that the new state of the database will be written out.
In case we won’t commit, and right after adding a pokemon we will try to query for it by name, for example, we might get an error saying that the pokemon is not found.

Updating Pokemons

def update_poke(name, type1=None, type2=None, sum_stats=None,  
                hp=None, attack=None, special_attack=None, 
                defense=None, special_defense=None):

    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()
    
    if not table_exists(cursor):
        load_csv_to_db()

    params = [type1, type2, sum_stats, hp, attack, special_attack,
              defense, special_defense]

    params_names = ['type1', 'type2', 'sum_stats', 'hp', 'attack',
                    'special_attack', 'defense', 'special_defense']

    for param, param_name in zip(params, params_names):
        if param:
            query = '''
                    UPDATE Pokemons SET ''' + param_name + '''    
                    = ? WHERE name = ?''' 
            cursor.execute(query, (param, name))

    conn.commit()

Since we want to allow updating only part of the information of a pokemon, we will iteratively check what parameters we want to update, and do so in the database.
For the same reasons mentioned above, we are using commit in the end.

Deleting Pokemons

def delete_poke(name):
    conn = sqlite3.connect(DB_FILENAME)
    cursor = conn.cursor()

    if not table_exists(cursor):
        load_csv_to_db()

    cursor.execute('''DELETE FROM Pokemons WHERE name = ?''',  
                       (name,))
     conn.commit()

Deleting a row based on a pokemon name, pretty self-explanatory.


API

After preparing all the database functionality we are going to use, the hardest part is behind us.
All we have to do now is simply wrap our database functions and returning some status codes.

Let’s create our second file, pokeapi.py , and continue coding.

Starting with the imports

from typing import Optional
from fastapi import FastAPI, Path, HTTPException, status
from pydantic import BaseModel
from database import get_poke_by_name, get_poke_by_type,
add_poke_to_db, update_poke, delete_poke

And to get a quick start to our API let’s add a welcoming message for the root path of our API.

app = FastAPI()

@app.get("/")
def root():
    raise HTTPException(status_code=status.HTTP_200_OK,  
                        detail="Welcome to PokeAPI")

Although it’s a short snippet there are a few concepts I will go over because they will repeat in the next snippets as well.

First of all, what does app.get("/") do?
It defines a GET request in the root path of our API — “/”.
The function defined afterward will be called when we will access this path.

Second, Instead of returning some custom JSON messages with default status codes, we will raise exceptions using HTTPException class.
As an argument, we can pass the status code we want to return and some informative message.


It’s time for our first real API call.

@app.get("/poke/{pokemon_name}")
def get_pokemon_by_name(pokemon_name: str = Path(None,  
                       description="Name of the pokemon you'd like 
                                   to retrieve")):

    pokemon = get_poke_by_name(pokemon_name)
    if not pokemon:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                           detail="Pokemon not found")

    return {"Pokemon": pokemon[0],
            "Types": [pokemon[1], pokemon[2]],
            "HP": pokemon[4],
            "Attack": pokemon[5],
            "Special Attack": pokemon[6],
            "Defense": pokemon[7],
            "Special Defense": pokemon[8],
            }

To understand this snippet, let’s get introduced with path parameters.

Path parameters are parameters that the user passes through the path of the request.
In our function, pokemon_name is a path parameter which means that if a user types out <server>/poke/Pikachu we will enter the function get_pokemon_by_name with the parameter pokemon_name="Pikachu" .

We can specify that a parameter of the function is a path parameter by using the Path class. Specifying None as the first argument means that if no pokemon name is passed the default value would be None .

In the body of the function, we will first check whether the required pokemon exists and if not we will raise an exception notifying the user we haven’t found it.
If we did find it, we will create a custom JSON with the information we would like to retrieve.

pokemon_name = “Pikachu”

The next API request is going to retrieve multiple pokemons based on their primary and secondary type.
We will allow querying just based on the primary type, or both types, which means that the secondary type is going to be optional.

@app.get("/poketype/{poke_type}")
def get_pokemon_by_type(poke_type: str =  
                        Path(None,description="Primary type of the 
                        pokemons you want to query),
                        type2: Optional[str] = None):

    pokemons = get_poke_by_type(poke_type, type2)
    if not pokemons:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
                            detail="No pokemon with this type")
    result = {}
    for idx, pokemon in enumerate(pokemons):
        result[idx] = {"Pokemon": pokemon[0],
                       "Types": [pokemon[1], pokemon[2]],
                       "HP": pokemon[4],
                       "Attack": pokemon[5],
                       "Special Attack": pokemon[6],
                       "Defense": pokemon[7],
                       "Special Defense": pokemon[8],
                       }
    return result  

This time poke_type would be our path parameter, and we have a second parameter that would be optional — you don’t have to specify it with the class Optional like I did, but it would be more readable.

Just as in the previous API call, we will return some error exception if we didn’t find any pokemons of the types specified.

If we did found at least one pokemon, we will go over the list we got and return a dictionary that every item in it is a dictionary containing the pokemon stats.

Primary type = “Grass”, Secondary type = “Poison”

We just took care of our two GET requests so now would be a good time to over the POST, PUT and, DELETE requests which will be used to add, update and, delete a pokemon.

These types of requests require a request body — unlike the query and path parameters we used before.
The request body would be the object we are storing in the database — a Pokemon.
We will add a class that defines a pokemon with its attributes and attribute types.

class Pokemon(BaseModel):
name: str
primary_type: str
secondary_type: str
sum_stats: int
hit_points: int
attack_strength: int
defensive_strength: int
special_attack_strength: int
special_defensive_strength: int

Now we can use this class in our requests 

@app.post("/newPoke/{pokemon_name}")
def create_pokemon(pokemon_name: str, pokemon: Pokemon):
    if get_poke_by_name(pokemon_name):
        raise HTTPException(
              status_code=status.HTTP_406_NOT_ACCEPTABLE, 
              detail="Pokemon already exists") 

    add_poke_to_db(pokemon.name, pokemon.primary_type, 
                   pokemon.secondary_type,
                   pokemon.sum_stats, pokemon.hit_points,
                   pokemon.attack_strength,
                   pokemon.special_attack_strength,
                   pokemon.defensive_strength,
                   pokemon.special_defensive_strength)

    raise HTTPException(
         status_code=status.HTTP_201_CREATED,  
         detail="Pokemon created successfully")

As you can see, as a parameter this function will get a Pokemon object which will include all the details necessary to create a new pokemon in our database.

The function body is fairly simple, first, we check whether there is already a pokemon with the name we passed.
If we don’t have such pokemon we will create it using the function we wrote in the database part.


Moving on to our update method

@app.put("/updatePoke/{pokemon_name}")
def update_pokemon(pokemon_name: str, pokemon: Pokemon):
    if not get_poke_by_name(pokemon_name):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
                           detail="Pokemon not found")

    update_poke(pokemon.name, pokemon.primary_type,
                pokemon.secondary_type,
                pokemon.sum_stats, pokemon.hit_points,
                pokemon.attack_strength,
                pokemon.special_attack_strength,
                pokemon.defensive_strength,
                pokemon.special_defensive_strength)
    
    raise HTTPException(status_code=status.HTTP_200_OK,
                        detail="Pokemon details updated")

After seeing the previous method, this one probably seems pretty self — explanatory.
– Get the request body via the Pokemon parameter.
– Check if the pokemon we want to update exists.
– If pokemon exists send pokemon parameters to the matching database function.

Only the parameters that we will change, will be set — the rest will stay the same as before the request (based on how we implemented the matching database function).


Congratulations, we have reached our final request.

@app.delete("/deletePoke/{pokemon_name}")
def delete_pokemon(pokemon_name: str):
    if not get_poke_by_name(pokemon_name):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
                            detail="Pokemon not found")
     delete_poke(pokemon_name)

     raise HTTPException(status_code=status.HTTP_200_OK,
                        detail="Pokemon deleted successfully")

Which by now probably seems pretty simple.


How to execute

In order to run our API and start playing around simply go to your project folder and type in the terminal

uvicorn pokeapi:app --reload

pokeapi — name of the file where you defined your API.

app — name of the variable holding the object FastAPI .

Once you did, you should be able to see this

Now go to http://127.0.0.1:8000/docs and start playing around.


Conclusion

We learned how to create an API from scratch integrated with a database and went through all the majorly used types of requests.

You can check out the full code at my GitHub.

1 comment on “Build An API With A Database From Scratch

  1. Pingback: 5 Unique Project Ideas For Developers – Coding Kaiser

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: