A Beginners Guide to Building a Basic Data Pipeline with Kestra

A Beginners Guide to Building a Basic Data Pipeline with Kestra

For junior data analysts and engineers, learning to build data pipelines can seem overwhelming. The trick is to learn to build simple pipelines and then progress to more complex ones. In the present day, several tools exist that make this process easy and seamless. This article will introduce you to Kestra, and show you how to build a simple data pipeline that loads CSV data into a MySQL database using Kestra.

Kestra

Kestra is an event-driven orchestrator for creating, running, scheduling, and monitoring a variety of complex pipelines. Kestra allows you to define workflows using YAML or JSON, and it ships with an intuitive user interface and declarative configuration making it accessible for users of varying skill levels(including non-coders).

Getting Started

To get started with Kestra, install following this process;

Ensure you have Docker Desktop downloaded and installed on your system.

Download the Docker Compose file for Kestra using the following command:

curl -o docker-compose.yml https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml

Ensure that Docker is up and running. Then, run the following command to start the server:

docker compose up -d

Finally, open localhost:8080 in your browser to access the Kestra user interface

That’s it! You have successfully launched Kestra. Here is what the Kestra UI looks like:

Building Your First Data Pipeline

One of the many amazing features of Kestra is that it provides an extensive list of plugging integrations for several databases, including MySQL, PostgreSQL, DuckDB, Snowflakes, and many more, look up the plugin's documentation for more details. Also amazing is the ability to write your code/flows directly on the flow editor, this way, you do not require any external IDE to use Kestra.

For this tutorial, we will write a flow that loads a CSV file into a MySQL database. We will work on a simple CSV file with very few columns using the Kestra MySQL plugin. Here is what is contained in our CSV file.

The first step is to create a database on our MySQL server to store our CSV files. In this case, ours will be called “tutorial”.

Then, we will write a flow that loads our CSV file into our MySQL database. Our flow will be in several parts.

First, we create a namespace for our flow, think of it as a folder for the flows:

id: data_pipeline
namespace: io.kestra.tutorial
description: |
#Building your first pipeline

Here, we provided an Id and a title, io.kestra.tutorial for our namespace, then we wrote a description for readability.

Next, we create a task that downloads our CSV data. For this, Kestra provides a download plugin:

tasks:
  - id: extract
    type: io.kestra.plugin.fs.http.Download
    uri: https://raw.githubusercontent.com/kestra-io/examples/main/datasets/orders.csv

Here, we have created a title(id) for the task called extract, then we specified the type of task, which is the Kestra download plugin, and also specified the CSV file we want to download in the uri. The download plugin essentially helps us download the CSV file into Kestra's internal storage.

Next, we write a task that enables us to load data from a local file into a MySQL database. This is important because by default MySQL does not allow you to load data from local files into a database.

- id: enableLocalFiles
    type: io.kestra.plugin.jdbc.mysql.Query
    sql: SET GLOBAL local_infile=1;

Here, we are using Kestra’s MySQL query plugin to write an SQL code that allows us to load data from our local files in MySQL. As you must have figured already, this task is titled “enableLocalFiles

Still using the query plugin, we will create a table to accommodate our CSV file. Here is what it looks like:

- id: createTable
    type: io.kestra.plugin.jdbc.mysql.Query
    sql: |
      create table if not exists Sales
      (
          order_id       integer,
          customer_name  varchar(50),
          customer_email varchar(50),
          product_id     integer,
          price          real,
          quantity       integer,
          total          real
      );

Notice how we can write a regular SQL code directly on the flow editor using the query plugin. Basically, we have created a sales table and added seven(7) columns with different data types to accommodate our CSV file.

Now that we have successfully created a table, let's load our data into the table

- id: loadData
    type: io.kestra.plugin.jdbc.mysql.Query
    inputFile: "{{ outputs.extract.uri }}"
    sql: |
      LOAD DATA LOCAL INFILE '{{ inputFile }}' 
      INTO TABLE Sales 
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      IGNORE 1 ROWS;

Notice something different about this task, it comes with an inputFile column, which is linked to the uri of our extract task(the task where we loaded the CSV). The inputFile now contains our downloaded CSV file, and we proceed to load data into it in our SQL query using LOAD DATA LOCAL INFILE '{{ inputFile }}'. You can read more about inputs and outputs in Kestra’s documentation.

We have now successfully written a flow that loads a CSV file into a MySQL table. The final step will be to link this to our database. For this, we will create a taskdefault, which allows us to define default properties that will be applied to all our tasks, it saves us the pain of repeatedly typing them out.

taskDefaults:
  - type: io.kestra.plugin.jdbc.mysql.Query
    values:
      url: jdbc:mysql://127.0.0.1:56982/tutorial
      username: mysql_user
      password: mysql_passwd

Using the MySQL query plugin, we specified the MySQL database connection URL, we also specified our MySQL server username and password. This will give Kestra access to our database(tutorial) and allow us to perform SQL queries on our database.

You can read more about the MySQL JDBC driver here.

And that is it! We are officially done creating a pipeline that loads a CSV file into a MySQL database. Here is what our full code looks like:

id: data_pipeline
namespace: io.kestra.tutorial
description: |
  # Building your first pipeline

tasks:
  - id: extract
    type: io.kestra.plugin.fs.http.Download
    uri: https://raw.githubusercontent.com/kestra-io/examples/main/datasets/orders.csv

  - id: enableLocalFiles
    type: io.kestra.plugin.jdbc.mysql.Query
    sql: SET GLOBAL local_infile=1;

  - id: createTable
    type: io.kestra.plugin.jdbc.mysql.Query
    sql: |
      create table if not exists Sales
      (
          order_id       integer,
          customer_name  varchar(50),
          customer_email varchar(50),
          product_id     integer,
          price          real,
          quantity       integer,
          total          real
      );

  - id: loadData
    type: io.kestra.plugin.jdbc.mysql.Query
    inputFile: "{{ outputs.extract.uri }}"
    sql: |
      LOAD DATA LOCAL INFILE '{{ inputFile }}' 
      INTO TABLE Sales 
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      IGNORE 1 ROWS;

taskDefaults:
  - type: io.kestra.plugin.jdbc.mysql.Query
    values:
      url: jdbc:mysql://127.0.0.1:56982/tutorial
      username: mysql_user
      password: mysql_passwd

With this, you can try your hand at building pipelines (both simple and complex) with several other databases. As mentioned earlier, Kestra provides plugins for popular databases, giving you the flexibility to work with any database you choose. Also, you are in luck, as Kestra now comes with blueprints that give you samples on how to write different flows. These samples can be modified and tailored to meet your specific requirements.

Conclusion

Kestra provides an easy solution for orchestrating various kinds of workflows, including building pipelines. As a young data engineer, paying attention to these new solutions and learning how to use them gives you an edge in carrying out your tasks.