uucss/enqueue/inline-css-limit
Connect with us

Development

How to build ETL pipelines in Python

Most organizations nowadays work with Big Data. Hence, creating an ETL pipeline from scratch for such data can be time-consuming and challenging.

etl pipeline wordpress

ETL stands for Extract, Transform, Load. As part of the ETL process, data is Extracted, Transformed, and Loaded into Data Warehouses so that organizations can analyze it to make strategic decisions.

The following are the key steps performed in the ETL pipeline:

  • Extract: This process collects and integrates data from a variety of sources, including Databases, Data Lakes, CRMs, and others.
  • Transform: This is the most crucial phase in an ETL Pipeline. To make data, analytics-ready, it must be properly gathered, sorted, cleaned, and pivoted, in this step.
  • Load: This process involves importing structured or unstructured data from Data Lakes, Databases, and other sources into Data Warehouses so that Data Analysts or other users can gain deep insights easily.

Understanding the Importance of Python ETL

Python is one of the modern world’s most popular & commonly leveraged programming languages, with endless applications in a variety of fields. It has won the prestigious TIOBE Programming Language of the Year 2021 award.

Python’s flexible and dynamic nature makes it ideal for Deployment, Analysis, and Maintenance tasks. Python ETL is one of the crucial skills required in Data Engineering to build Data Pipelines, develop Statistical Models, and perform a thorough analysis on them.

It has become a popular tool for executing ETL processes due to its ease of use and robust libraries for accessing databases and storage systems. Many teams use Python for ETL & Data Engineering rather than an ETL tool as it is more versatile and powerful for these tasks.

The greatest benefit of Python over other programming languages is the simplicity of use in Data Mining, Data Science, Big Data, Artificial Intelligence, and Machine Learning.

Companies around the world use Python for their data to obtain insights, manage their operations, and keep everything running smoothly.

2 Easy Steps to Build Python ETL Pipeline

In this part, you’ll learn the essential steps for building an ETL pipeline using Python. You’ll create a basic Data Pipeline that feeds data into a Microsoft SQL Server database from MySQL & Microsoft SQL Server Databases.

In order to set up the Python ETL script, follow the steps below:

Step 1: Install the Required Modules

To set up the Python ETL Pipeline, you’ll need to install the following modules:

  • Python to MySQL Connector: mysql-connector-python (Use pip install mysql-connector-python command to install)
  • Python to Microsoft SQL Server Connector: pyodbc (Use pip install pyodbc command to install)

Step 2: Set Up the ETL Directory

After installing the above packages, you need to create 4 Python files, mentioned below in your project directory:

  • db_credentials.py: This file includes code to establish connections with all Databases.
  • sql_queries.py: This file comprises the commonly used Database queries to extract and load data in string format.
  • etl.py: This file possesses the necessary operations to connect to the Database and run the required queries.
  • main.py: This is the primary file that regulates the flow and execution of the Python ETL Pipeline.

A) db_credentials.py

All Source and Target Database Connection Strings should be included in this file. It should contain all the necessary information for accessing the relevant database in a list format so that it can be quickly iterated when needed. The following is a sample Python script to establish the Database connection:

datawarehouse_name = 'your_dwh_name'
# sql-server (target db, datawarehouse)
datawarehouse_db_config = {
  'Trusted_Connection': 'yes',
  'driver': '{SQL Server}',
  'server': 'datawarehouse_sql_server',
  'database': '{}'.format(datawarehouse_name),
  'user': 'your_db_uname',
  'password': 'your_db_pword',
  'autocommit': True,
}
# source db > sql-server
sqlserver_db_config = [
  {
    'Trusted_Connection': 'yes',
    'driver': '{SQL Server}',
    'server': 'your_db_sql_server',
    'database': 'db_1st',
    'user': 'your_db_uname',
    'password': 'your_db_pword',
    'autocommit': True,
  }
]
# source db > mysql
mysql_db_config = [
  {
    'user': 'your_1_user',
    'password': 'your_1_pword',
    'host': 'db_connection_string_1',
    'database': 'db_1st',
  },
  {
    'user': 'your_2_user,
    'password': 'your_2_password',
    'host': 'db_connection_string_2',
    'database': 'db_2nd',
  },
]

B) sql_queries.py

This file includes queries for extracting data from the Source Databases and loading it into the Target Database. The following script will help you perform this task:

# example queries, will be unique for different database platforms

sqlserver_extract = ('''
  SELECT sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  FROM sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  VALUES (?, ?, ?)  
''')
mysql_extract = ('''
  SELECT mysql_col_1, mysql_col_2, mysql_col_3
  FROM mysql_demo_table
''')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  VALUES (?, ?, ?)  
''')

# Queries getting exported
class Sql_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = extract_query
    self.load_query = load_query   
# create instances for Sql_Query class
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# creating a list for iterating through values
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]

C) etl.py

This file should include the code required to access the relevant Databases and execute the required queries. The following script will help you perform this task:

# python-based modules
import pyodbc
import mysql.connector

def etl(query, source_cnx, target_cnx):
  # extract data from demo source database
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into demo Data Warehouse db
  
if data:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, data)
    print('data loaded to the demo Data Warehouse db')
    target_cursor.close()
  else:
    print('data is empty')

def etl_process(queries, target_cnx, source_db_config, db_platform):

  # configuring demo source database connection
  if db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  else:
    return 'Error! unrecognised source database platform'
  # loop through sql queries
  for query in queries:
    etl (query, source_cnx, target_cnx)    
  # close the source db connection
  source_cnx.close()

D)  main.py

This file includes code to iterate through given credentials to connect to the database and execute the necessary ETL Python operations. The following script will help you perform this task:

# variables
from db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config
from sql_queries import sqlserver_queries, mysql_queries

# methods
from etl import etl_process
def main():
  print('starting the etl data process')
	
  # establish connection for SQL Server, desired destination storage
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # looping through credentials
  # Database > mysql
  for config in mysql_db_config: 
    try:
      print("loading db: " + config['database'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    except Exception as error:
      print("etl for {} has error".format(config['database']))
      print('error message: {}'.format(error))
      continue
	
  # Database > sql-server
  for config in sqlserver_db_config: 
    try:
      print("loading db: " + config['database'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    except Exception as error:
      print("etl for {} has error".format(config['database']))
      print('error message: {}'.format(error))
      continue

  target_cnx.close()
if __name__ == "__main__":
  main()

Conclusion

Great Work! You have successfully gained a basic understanding of building Python ETL Pipeline. Now you can implement your custom Python ETL script based on your requirements by making changes to the databases being used and queries accordingly.

To explore the widely used Python ETL Tools in the industry, give a read to the Best Python ETL Tools blog.

Most organizations nowadays work with Big Data. Hence, creating an ETL pipeline from scratch for such data can be time-consuming and challenging.

Moreover, enterprises will need to invest a significant amount of resources in order to build it and then guarantee that they can keep up with the high data volume and schema fluctuations.

So, instead of creating ETL scripts from scratch, you can leverage automated Data Pipelines such as Hevo.

Have any thoughts on this? Let us know down below in the comments or carry the discussion over to our Twitter or Facebook.

Editors’ Recommendations:

More in Development