How To Build A Simple Data Pipeline on Google Cloud Platform
Here’s a demonstration of how to build a simple data pipeline using Google Cloud Platform services such as Google Cloud Storage (GCS), BigQuery, Google Cloud Function (GCF), and Google Cloud Composer.
Google Cloud Platform is a suite of cloud computing services that brings together computing, data storage, data analytics and machine learning capabilities to enable organizations to build data pipelines, secure data workloads, and carry out analytics. In this article, we will conduct a simple demonstration of how to build a simple data pipeline using services from the Google Cloud Platform (GCP).
For this particular use case, let’s pretend to be an imaginary company named LRC, Inc. that sells various products.
Requirement
Our sales department wants to provide a CSV file of recent sales. The file will come from our sales management system and the data will be used for tracking sales and providing aggregated data to an upstream system. The file will contain a customer name, a product id, an item count, and an item cost.
The data should be stored over time and new data appended. As part of the process, a new file should be generated that will contain all order data aggregated by customer and product. The new file will be stored in a shared location so that another team can access it anytime.
When the file is delivered, an automated email should be sent to a specific address ([email protected]) that will open a ticket alerting the other team that a file is available.
Design
Our pipeline is fairly simple. We have several steps:
- Watch for a file
- Load a file into a database
- Create an aggregation from the data
- Create a new file
- Send an email
Our imaginary company is a GCP user, so we will be using GCP services for this pipeline. Even with restricting ourselves to GCP, there are still many ways to implement these requirements. As part of our technical requirements, we want to keep this as serverless as possible and write as little code as possible.
We will use Google Cloud Storage (GCS) to store the files. Our database will be Google’s big data distributed SQL database, BigQuery, and we will use Google’s managed Airflow, Google Cloud Composer. We will also use a Google Cloud Function (GCF) to watch for a file arrival. We could also use GCF to send the email at the final step, but we will put that in our composer orchestration to keep things as contained as possible.
This article does not contain a tutorial for any of these services but will explain how the pieces fit together to create a pipeline. Describing each service in detail would require many articles. Google provides excellent tutorials, and I will reference some of those as we go through the process.
Learn More: How To Build Data Pipelines for a Multi-Cloud Environment
Implementation
GCP provides a very nice-looking UI for most commands and services. It follows Google’s minimalistic design and is pretty clean. However, to create a repeatable and automated deployment process, we should use the command line for most tasks related to pipeline development. The UI is great for exploration and analysis. Scripting is the rule for things we will need to automate. Our pipeline would first be created in a development environment, deployed later to a test environment, and finally to a production environment. Because of that, I will primarily use command line interface (CLI).
Google Cloud Storage
The first step is to create a couple of buckets in GCS. We will follow the LRC, Inc bucket naming standards. For this, we will use our company’s name, followed by the environment, followed by a decent name.
Note: GCS bucket names are globally unique. If you follow along, you cannot name your buckets the same as what I name them. You will need to choose your bucket names and remember them later in the process.
For our incoming file, we will name it lrc-dev-incoming-sales-data. I will refer to this later as “the incoming bucket”.
gsutil mb -l us-east1 gs://lrc-dev-incoming-sales-data/
For our outgoing file, we will name it lrc-dev-outgoing-sales-aggregate. I will refer to this later as “the outgoing bucket”.
gsutil mb -l us-east1 gs://lrc-dev-outgoing-sales-aggregate/
We can list those to be sure they were created:
Find more information on gsutil and buckets at the GSUtil GCP Page.
BigQuery
We need a table in BigQuery to hold our data and will use the BQ CLI command to create this. BQ allows several ways to create a table. If you are familiar with JSON, you can use a JSON doc to describe the columns and data types. If you are from a database background, you will probably want to use SQL DDL to create the table. I will use DDL.
You could also defer creating the table and allow creation during the step in Composer that we will use to load the data. That command will create the table if it does not exist. However, I would rather create it as part of the deployment, so we will use BQ to do so. For the final aggregate table, I will let Composer create it to show how it works.
The BQ command is:
You can read more about BQ and creating tables at the GCP BQ page.
Learn More: Close to the Edge: Automated Data Pipelines for Real-time Processing
Google Cloud Function (GCF)
For the GCF part of this, the function that will watch for a file arrival, we will use python and we will actually use an example provided by Google.
Navigate to the Triggering with GCF page in the Cloud Composer documentation. If you follow that documentation, you will end up with a cloud function that calls a DAG when a file in GCS is finalized. If you follow the directions and implement it exactly as written (with a small minor deviation described below), you will end with the first step in our simple pipeline.
The one small deviation is that you will need to use the incoming bucket that you created below for the trigger Bucket. You cannot use the one I use or the one in the documentation as they are globally unique.
Once finished with this step, you will have a DAG called trigger_response_dag.py with a DAG id of composer_sample_trigger_response_dag. We will use that DAG in the rest of the steps.
Composer (Airflow)
For the final step, we will create the DAG for the composer. We have four steps remaining: upload the CSV file from GCS to BQ, create an aggregation, export the aggregated data from BQ to GCS, and finally, send an email.
Edit the DAG from the GCF step. Remove the one task (print_gcs_info) in the DAG. We will replace that with our own steps. We will need to add some imports to the existing imports. These imports will allow us to call the required operators.
Below the imports and just above the default_args declaration, add these 4 variables:
These are variables that we will use in the steps below. They should be fairly self-explanatory, and you will see how they are used shortly.
Now, inside the “with airflow.DAG()” section (the body of the DAG), replace the bash task with the following steps.
First, we want to read the file into BQ from GCS. We use the GoogleCloudStorageToBigQueryOperator operator for this:
The schema_fields parameter defines our table and the CSV file should match that format.
Notice the bucket=”{{ dag_run.conf[‘bucket’] }}”, andsource_objects=[ “{{
dag_run.conf[‘name’] }}” ], parameters. The GCF function sends the bucket and file name that were uploaded to the incoming bucket to the DAG as a dag_run.conf{} input parameter. These two parameters in the operator allow Jinja templates. The syntax we use extracts the bucket name and file name.
We could hard code the bucket name since we know what the incoming bucket is. It is a better practice to get the bucket name from the function though, as we may call this dag from multiple buckets in the future, and we will likely have different bucket names for dev, test, and prod. The other variables above could be created as Airflow variables, and that is what I would recommend. That is beyond the scope of this article, though.
Once this task is completed, the data will be in BigQuery. The next task will be of the BigQueryOperator to aggregate the data into a new table for export. This table will be truncated if it already exists.
The query above sums the item_count and sums the item_count * the item_cost to provide aggregates and is grouped by the customer name and product id.
The third task is to write our new table out to GCS so that the other team can pick it up. We are going to call our file, sales_data_aggregate_ with today’s date appended to it. We will use the today variable we declared above for the date.
The last task is to send an email to the other team letting them know they can come pick up the file. We’ll send an email to “[email protected]”.
At the bottom of the DAG, add the execution line:
GCStoBQ >> Transform >> BQtoGCS >> SendEmail
We now have a completed pipeline. The final DAG, in the airflow console, should look like this:
For more Cloud Composer documentation and tutorials, check out the Google Cloud Composer Documentation.
Here is an example of a CSV to upload to the incoming bucket.
If you upload it several times, you will be able to see how the aggregate values increase over time.
Bottom Line
That’s all it takes to create a fairly simple but fully-featured data pipeline. We ingested data, transformed it, extracted the new data, and sent a notification. Note that there is no data validation or error handling and that is something you would want to incorporate in a production system. On GCP, logging is pretty much automatic.
Data pipelines can become very complex. While this is a simple data pipeline, it is indicative of the major steps most batch pipelines include.
Let us know your thoughts in the comment section below or on LinkedIn, Twitter, or Facebook. We would love to hear from you!