about analyzing crime traits in your native space. You understand that relevant data exists, and you’ve got some primary analytical abilities that you need to use to investigate this knowledge. Nonetheless, this knowledge is altering incessantly, and also you wish to hold your evaluation up to date with the newest crime incidents with out repeating your evaluation. How can we automate this course of?
Nicely, for those who’ve stumbled upon this text, you’re in luck! Collectively, we’ll stroll by means of how one can create an information pipeline to extract native police log knowledge, and join this to a visualization platform to look at native crime traits over time. For this text, we’ll extract knowledge on incidents reported to the Cambridge (MA) Police Division (CPD), after which visualize this knowledge as a dashboard in Metabase.
Moreover, this text can function a common template for anyone seeking to write ETL pipelines orchestrated in Prefect, and/or anyone who needs to attach Metabase to their knowledge shops to create insightful analyses/reviews.
Notice: I’ve no affiliation with Metabase – we’ll merely use Metabase for example platform to create our ultimate dashboard. There are various different viable options, that are described on this section.
Contents:
Background Information
Earlier than we dive into the pipeline, it’ll be useful to assessment the next ideas, or hold these hyperlinks as reference as you learn.
Information of Curiosity
The info we’ll be working with comprises a group of police log entries, the place every entry is a single incident reported to/by the CPD. Every entry comprises complete info describing the incident, together with however not restricted to:
- Date & time of the incident
- Sort of incident that occurred
- The road the place the incident befell
- A plaintext description of what occurred

Take a look at the portal for extra details about the information.
For monitoring crime traits in Cambridge, MA, creating an information pipeline to extract this knowledge is acceptable, as the information is up to date day by day (in line with their web site). If the information was up to date much less incessantly (e.g. yearly), then creating an information pipeline to automate this course of wouldn’t save us a lot effort. We might merely revisit the information portal on the finish of every 12 months, obtain the .csv, and full our evaluation.
Now that we’ve discovered the suitable dataset, let’s stroll by means of the implementation.
ETL Pipeline
To go from uncooked CPD log knowledge to a Metabase dashboard, our challenge will encompass the next main steps:
- Extract the information through the use of its corresponding API.
- Remodeling it to organize it for storage.
- Loading it right into a PostgreSQL database.
- Visualizing the information in Metabase.
The info movement of our system will appear to be the next:

Our pipeline follows an ETL workflow, which implies that we’ll rework the information earlier than importing it into PostgreSQL. This requires loading knowledge into reminiscence whereas executing knowledge transformations, which can be problematic for big datasets which might be too large to slot in reminiscence. On this case, we could take into account an ELT workflow, the place we rework the information in the identical infrastructure the place it’s saved. Since our dataset is small (<10k rows), this shouldn’t be an issue, and we’ll make the most of the truth that pandas makes knowledge transformation straightforward.
We’ll extract the CPD log knowledge by making a request for the dataset to the Socrata Open Data API. We’ll use sodapy — a python shopper for the API — to make the request.
We’ll encapsulate this extraction code in its personal file — extract.py.
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import process
@process(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
'''
Extract incident knowledge reported to the Cambridge Police Division utilizing the Socrata Open Information API.
Return the incident knowledge as a Pandas DataFrame.
'''
# fetch Socrata app token from .env
# embrace this app token when interacting with the Socrata API to keep away from request throttling, so we are able to fetch all of the incidents
load_dotenv()
APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN")
# create Socrata shopper to work together with the Socrata API (https://github.com/afeld/sodapy)
shopper = Socrata(
"knowledge.cambridgema.gov",
APP_TOKEN,
timeout=30 # improve timeout from 10s default - typically, it takes longer to fetch all the outcomes
)
# fetch all knowledge, paginating over outcomes
DATASET_ID = "3gki-wyrb" # distinctive identifier for Cambridge Police Log knowledge (https://knowledge.cambridgema.gov/Public-Security/Every day-Police-Log/3gki-wyrb/about_data)
outcomes = shopper.get_all(DATASET_ID)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(outcomes)
return results_df
Notes concerning the code:
- Socrata throttles requests for those who don’t embrace an app token that uniquely identifies your software. To fetch all the outcomes, we’ll embrace this token in our request and put this in a .env file to maintain this out of our supply code.
- We’ll specify a 30 second timeout (as an alternative of the ten second default timeout) when making our request to the Socrata API. From expertise utilizing the API, fetching all the outcomes might typically take longer than 10 seconds, and 30 seconds was sometimes sufficient to keep away from timeout errors.
- We’ll load the fetched outcomes right into a pandas DataFrame, since we’ll validate and rework this knowledge utilizing pandas.
ETL: Validate
Now, we’ll do some primary knowledge high quality checks on the information.
The info is already pretty clear (which is sensible because it’s supplied by the Cambridge Police Division). So, our knowledge high quality checks will act extra as a “sanity test” that we didn’t ingest something sudden.
We’ll validate the next:
- All of the anticipated columns (as specified here) are current.
- All IDs are numeric.
- Datetimes comply with ISO 8601 format.
- There aren’t any lacking values in columns that ought to comprise knowledge. Particularly, every incident ought to have a Datetime, ID, Sort, and Location.
We’ll put this validation code in its personal file — validate.py.
from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import process
### UTILITIES
def check_valid_schema(df):
'''
Test whether or not the DataFrame content material comprises the anticipated columns for the Cambridge Police dataset.
In any other case, elevate an error.
'''
SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
if Counter(df.columns) != Counter(SCHEMA_COLS):
elevate ValueError("Schema doesn't match with the anticipated schema.")
def check_numeric_id(df):
'''
Convert 'id' values to numeric.
If any 'id' values are non-numeric, change them with NaN, to allow them to be eliminated downstream within the knowledge transformations.
'''
df['id'] = pd.to_numeric(df['id'], errors='coerce')
return df
def verify_datetime(df):
'''
Confirm 'date_time' values comply with ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
Elevate a ValueError if any of the 'date_time' values are invalid.
'''
df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1)
def check_missing_values(df):
'''
Test whether or not there are any lacking values in columns that require knowledge.
For police logs, every incident ought to have a datetime, ID, incident kind, and placement.
'''
REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
for col in REQUIRED_COLS:
if df[col].isnull().sum() > 0:
elevate ValueError(f"Lacking values are current within the '{col}' attribute.")
### VALIDATION LOGIC
@process
def validate_data(df):
'''
Test the information satisfies the next knowledge high quality checks:
- schema is legitimate
- IDs are numeric
- datetime follows ISO 8601 format
- no lacking values in columns that require knowledge
'''
check_valid_schema(df)
df = check_numeric_id(df)
verify_datetime(df)
check_missing_values(df)
return df
When implementing these knowledge high quality checks, it’s vital to consider how one can deal with knowledge high quality checks that fail.
- Do we would like our pipeline to fail loudly (e.g. elevate an error/crash)?
- Ought to our pipeline deal with failures silently? As an example, mark knowledge recognized to be invalid in order that it may be eliminated downstream?
We’ll elevate an error if:
- The ingested knowledge doesn’t comply with the anticipated schema. It doesn’t make sense to course of the information if it doesn’t comprise what we count on.
- Datetime doesn’t comply with ISO 8601 format. There’s no customary approach to convert incorrect datetime values to its corresponding appropriate datetime format.
- The incident comprises lacking values for any certainly one of datetime, ID, kind, and placement. With out these values, the incident can’t be described comprehensively.
For information which have non-numeric IDs, we’ll fill them with NaN placeholders after which take away them downstream within the transformation step. These information don’t break our evaluation if we merely take away them.
ETL: Remodel
Now, we’ll do some transformations on our knowledge to organize it for storage in PostgreSQL.
We’ll do the next transformations:
- Take away duplicate rows — we’ll use the ‘ID’ column to determine duplicates.
- Take away invalid rows — among the rows that failed the information high quality checks have been marked with an NaN ‘ID’, so we’ll take away these.
- Cut up the datetime column into separate 12 months, month, day, and time columns. In our ultimate evaluation, we could wish to analyze crime traits by these totally different time intervals, so we’ll create these further columns right here to simplify our queries downstream.
We’ll put this transformation code in its personal file — rework.py.
import pandas as pd
from prefect import process
### UTILITIES
def remove_duplicates(df):
'''
Take away duplicate rows from dataframe primarily based on 'id' column. Hold the primary prevalence.
'''
return df.drop_duplicates(subset=["id"], hold='first')
def remove_invalid_rows(df):
'''
Take away rows the place the 'id' is NaN, as these IDs have been recognized as non-numeric.
'''
return df.dropna(subset='id')
def split_datetime(df):
'''
Cut up the date_time column into separate 12 months, month, day, and time columns.
'''
# convert to datetime
df['date_time'] = pd.to_datetime(df['date_time'])
# extract 12 months/month/day/time
df['year'] = df['date_time'].dt.12 months
df['month'] = df['date_time'].dt.month
df['day'] = df['date_time'].dt.day
df['hour'] = df['date_time'].dt.hour
df['minute'] = df['date_time'].dt.minute
df['second'] = df['date_time'].dt.second
return df
### TRANSFORMATION LOGIC
@process
def transform_data(df):
'''
Apply the next transformations to the handed in dataframe:
- deduplicate information (hold the primary)
- take away invalid rows
- cut up datetime into 12 months, month, day, and time columns
'''
df = remove_duplicates(df)
df = remove_invalid_rows(df)
df = split_datetime(df)
return df
ETL: Load
Now our knowledge is able to import into into PostgreSQL.
Earlier than we are able to import our knowledge, we have to create our PostgreSQL occasion. We’ll create one domestically utilizing a compose file. This file permits us to outline & configure all of the providers that our software wants.
providers:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
surroundings:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/knowledge
restart: unless-stopped
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
surroundings:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on: # do not begin pg_admin till our postgres occasion is working
- postgres_cpd
volumes:
pgdata_cpd: # all knowledge for our postgres_cpd service shall be saved right here
There are two most important providers outlined right here:
- postgres_cpd — That is our PostgreSQL occasion the place we’ll retailer our knowledge.
- pgadmin —DB admin platform which offers a GUI we are able to use to question knowledge in our PostgreSQL database. Not functionally required, however helpful for checking the information in our database. For extra info on connecting to your PostgreSQL database in pgAdmin, click on here.
Let’s spotlight some vital configuration for our postgres_cpd service:
- container_name: postgres_cpd_dev -> Our service will run in a container (i.e. an remoted course of) named postgres_cpd_dev. Docker generates random container names for those who don’t specify this, so assigning a reputation will make it extra simple to work together with the container.
- surroundings: -> We create a Postgres person from credentials saved in our .env file. Moreover, we create a default database, cpd_dev.
- ports: -> Our PostgreSQL service will pay attention on port 5432 inside the container. Nonetheless, we’ll map port 5433 on the host machine to port 5432 within the container, permitting us to hook up with PostgreSQL from our host machine by way of port 5433.
- volumes: -> Our service will retailer all its knowledge (e.g. configuration, knowledge recordsdata) beneath the next listing inside the container: /var/lib/postgresql/knowledge. We’ll mount this container listing to a named Docker quantity saved on our native machine, pgdata_cpd. This enables us to persist the database knowledge past the lifetime of the container.
Now that we’ve created our PostgreSQL occasion, we are able to execute queries in opposition to it. Importing our knowledge into PostgreSQL requires executing two queries in opposition to the database:
- Creating the desk that may retailer the information.
- Loading our remodeled knowledge into that desk.
Every time we execute a question in opposition to our PostgreSQL occasion, we have to do the next:
- Set up our connection to PostgreSQL.
- Execute the question.
- Commit the adjustments & shut the connection.
from prefect import process
from sqlalchemy import create_engine
import psycopg2
from dotenv import load_dotenv
import os
# learn content material from .env, which comprises our Postgres credentials
load_dotenv()
def create_postgres_table():
'''
Create the cpd_incidents desk in Postgres DB (cpd_db) if it does not exist.
'''
# set up connection to DB
conn = psycopg2.join(
host="localhost",
port="5433",
database="cpd_db",
person=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
# create cursor object to execute SQL
cur = conn.cursor()
# execute question to create the desk
create_table_query = '''
CREATE TABLE IF NOT EXISTS cpd_incidents (
date_time TIMESTAMP,
id INTEGER PRIMARY KEY,
kind TEXT,
subtype TEXT,
location TEXT,
description TEXT,
last_updated TIMESTAMP,
12 months INTEGER,
month INTEGER,
day INTEGER,
hour INTEGER,
minute INTEGER,
second INTEGER
)
'''
cur.execute(create_table_query)
# commit adjustments
conn.commit()
# shut cursor and connection
cur.shut()
conn.shut()
@process
def load_into_postgres(df):
'''
Hundreds the remodeled knowledge handed in as a DataFrame
into the 'cpd_incidents' desk in our Postgres occasion.
'''
# create desk to insert knowledge into as needed
create_postgres_table()
# create Engine object to hook up with DB
engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
# insert knowledge into Postgres DB into the 'cpd_incidents' desk
df.to_sql('cpd_incidents', engine, if_exists='change')
Issues to notice concerning the code above:
- Much like how we fetched our app token for extracting our knowledge, we’ll fetch our Postgres credentials from a .env file.
- To load the DataFrame containing our remodeled knowledge into Postgres, we’ll use the pandas.DataFrame.to_sql(). It’s a easy approach to insert DataFrame knowledge into any database supported by SQLAlchemy.
Defining the Information Pipeline
We’ve carried out the person parts of the ETL course of. Now, we’re able to encapsulate these parts right into a pipeline.
There are various instruments accessible to make use of for orchestrating pipelines outlined in python. Two well-liked choices are Apache Airflow and Prefect.
For it’s simplicity, we’ll proceed with defining our pipeline utilizing Prefect. We have to do the next to get began:
- Set up Prefect in our improvement surroundings.
- Get a Prefect API server. Since we don’t wish to handle our personal infrastructure to run Prefect, we’ll join for Prefect Cloud.
For extra info on Prefect setup, try the docs.
Subsequent, we should add the next decorators to our code:
- @process -> Add this to every perform that implements a part of our ETL pipeline (i.e. our extract, validate, rework, and cargo features).
- @movement -> Add this decorator to the perform that encapsulates the ETL parts into an executable pipeline.
In the event you look again at our extract, validate, rework, and cargo code, you’ll see that we added the @process decorator to those features.
Now, let’s outline our ETL pipeline that executes these duties. We’ll put the next in a separate file, etl_pipeline.py.
from extract import extract_data
from validate import validate_data
from rework import transform_data
from load import load_into_postgres
from prefect import movement
@movement(title="cpd_incident_etl", log_prints=True) # Our pipeline will seem as 'cpd_incident_etl' within the Prefect UI. All print outputs shall be displayed in Prefect.
def etl():
'''
Execute the ETL pipeline:
- Extract CPD incident knowledge from the Socrata API
- Validate and rework the extracted knowledge to organize it for storage
- Import the remodeled knowledge into Postgres
'''
print("Extracting knowledge...")
extracted_df = extract_data()
print("Performing knowledge high quality checks...")
validated_df = validate_data(extracted_df)
print("Performing knowledge transformations...")
transformed_df = transform_data(validated_df)
print("Importing knowledge into Postgres...")
load_into_postgres(transformed_df)
print("ETL full!")
if __name__ == "__main__":
# CPD knowledge is anticipated to be up to date day by day (https://knowledge.cambridgema.gov/Public-Security/Every day-Police-Log/3gki-wyrb/about_data)
# Thus, we'll execute our pipeline every day (at midnight)
etl.serve(title="cpd-pipeline-deployment", cron="0 0 * * *")
Issues to notice concerning the code:
- @movement(title=”cpd_incident_etl”, log_prints=True) -> this names our pipeline “cpd_incident_etl”, which shall be mirrored within the Prefect UI. The output of all our print statements shall be logged in Prefect.
- etl.serve(title=”cpd-pipeline-deployment”, cron=”0 0 * * *”) -> this creates a deployment of our pipeline, named “cpd-pipeline-deployment”, that runs each day at midnight.


Now that we’ve created our pipeline to load our knowledge into PostgreSQL, it’s time to visualise it.
There are various approaches we might take to visualise our knowledge. Some notable choices embrace:
Each are good choices. With out going into an excessive amount of element behind every BI software, we’ll use Metabase to make a easy dashboard.
- Metabase is an open-source BI and embedded analytics software that makes knowledge visualization and evaluation easy.
- Connecting Metabase to our knowledge sources and deploying it’s simple, in comparison with different BI instruments (ex: Apache Superset).
Sooner or later, if we wish to have extra customization over our visuals/reviews, we are able to think about using different instruments. For now, Metabase will do for making a POC.
Metabase permits you to choose between utilizing its cloud model or managing a self-hosted occasion. Metabase Cloud offeres several payment plans, however you may create a self-hosted occasion of Metabase totally free utilizing Docker. We’ll outline our Metabase occasion in our compose file.
- Since we’re self-hosting, we additionally should outline the Metabase application database, which comprises the metadata that Metabase wants to question your knowledge sources (in our case, postgres_cpd).
providers:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
surroundings:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/knowledge
restart: unless-stopped
networks:
- metanet1
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
surroundings:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on:
- postgres_cpd
networks:
- metanet1
metabase: # taken from https://www.metabase.com/docs/newest/installation-and-operation/running-metabase-on-docker
picture: metabase/metabase:newest
container_name: metabase
hostname: metabase
volumes:
- /dev/urandom:/dev/random:ro
ports:
- "3000:3000"
surroundings:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabaseappdb
MB_DB_PORT: 5432
MB_DB_USER: ${METABASE_DB_USER}
MB_DB_PASS: ${METABASE_DB_PASSWORD}
MB_DB_HOST: postgres_metabase # should match container title of postgres_mb (Metabase Postgres occasion)
networks:
- metanet1
healthcheck:
check: curl --fail -I http://localhost:3000/api/well being || exit 1
interval: 15s
timeout: 5s
retries: 5
postgres_mb: # postgres occasion for managing Metabase occasion
picture: postgres:16
container_name: postgres_metabase # different providers should use this title to speak with this container
hostname: postgres_metabase # inner identifier, does not impression communication with different providers (useful for logs)
surroundings:
POSTGRES_USER: ${METABASE_DB_USER}
POSTGRES_DB: metabaseappdb
POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD}
ports:
- "5434:5432"
volumes:
- pgdata_mb:/var/lib/postgresql/knowledge
networks:
- metanet1
# Right here, we'll outline separate volumes to isolate DB configuration & knowledge recordsdata for every Postgres database.
# Our Postgres DB for our software ought to retailer its config/knowledge individually from the Postgres DB our Metabase service depends on.
volumes:
pgdata_cpd:
pgdata_mb:
# outline the community over which all of the providers will talk
networks:
metanet1:
driver: bridge # TO DO: 'bridge' is the default community - providers will be capable to talk with one another utilizing their service names
To create our Metabase occasion, we made the next adjustments to our compose file:
- Added two providers: metabase (our Metabase occasion) and postgres_mb (our Metabase occasion’s software database).
- Outlined a further quantity, pgdata_mb. It will retailer the information for the Metabase software database (postgres_mb).
- Outlined the community over which the providers will talk, metanet1.
With out going into an excessive amount of element, let’s break down the metabase and postgres_mb providers.
Our Metabase occasion (metabase):
- This service shall be uncovered on port 3000 on the host machine and inside the container. If we’re working this service on our native machine, we’ll be capable to entry it at localhost:3000.
- We join Metabase to it’s software database by guaranteeing that the MB_DB_HOST, MB_DB_PORT, and MB_DB_NAME surroundings variables match up with the container title, ports, and database title listed beneath the postgres_mb service.
For extra info on how one can run Metabase in Docker, try the docs.
After establishing Metabase, you’ll be prompted to attach Metabase to your knowledge supply.

After deciding on a PostgreSQL knowledge supply, we are able to specify the next connection string to attach Metabase to our PostgreSQL occasion, substituting your credentials as needed:
postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db

After establishing the connection, we are able to create our dashboard. You may create a wide variety of visuals in Metabase, so we received’t go into the specifics right here.
Let’s revisit the instance dashboard that we displayed in the beginning of this put up. This dashboard properly summarizes current and historic traits in reported CPD incidents.

From this dashboard, we are able to see the next:
- Most incidents are reported to the CPD within the mid-late afternoon.
- An amazing majority of reported incidents are of the “INCIDENT” kind.
- The variety of reported incidents peaked round August-October of 2025, and has been reducing steadily ever since.
Fortunately for us, Metabase will question our database at any time when we load this dashboard, so we received’t have to fret about this dashboard displaying stale knowledge.
Take a look at the Git repo here if you wish to dive deeper into the implementation.
Wrap-up and Future Work
Thanks for studying! Let’s briefly recap what we constructed:
- We constructed an information pipeline to extract, rework, and cargo Cambridge Police Log knowledge right into a self-hosted PostgreSQL database.
- We deployed this pipeline utilizing Prefect and scheduled it to run day by day.
- We created a self-hosted occasion of Metabase, related it to our PostgreSQL database, and created a dashboard to visualise current and historic crime traits in Cambridge, MA.
There are various methods to construct upon this challenge, together with however not restricted to:
- Creating further visualizations (geospatial heatmap) to visualise crime frequencies in numerous areas inside Cambridge. This might require remodeling our avenue location knowledge into latitude/longitude coordinates.
- Deploying our self-hosted pipeline and providers off of our native machine.
- Think about becoming a member of this knowledge with different datasets for insightful cross-domain evaluation. As an example, maybe we might be part of this dataset to demographic/census knowledge (utilizing avenue location) to see whether or not areas of various demographic make-up inside Cambridge have totally different incident charges.
If in case you have every other concepts for how one can lengthen upon this challenge, otherwise you would’ve constructed issues in a different way, I’d love to listen to it within the feedback!
The writer has created all photos on this article.
Sources & GitHub
Prefect:
Metabase:
Docker:
GitHub Repo:
CPD Every day Police Log Dataset:
