Using Data from a Data Pipeline in BigQuery

Image result for bigquery

In my last post I wrote about building a data pipeline to land leads data from noCRM inside BigQuery, but I really didn’t get much into BigQuery itself.

To catch up on my last post check it out here:

BigQuery is an ANSI SQL-based managed data warehouse solution by Google. Since it’s managed you do not need to worry about scaling, and you only pay for what you use. It also has the benefit of being loaded with additional features like BigQuery ML, which I’ll get into later.

I built my tables using the gcloud SDK command and referencing a schema in JSON format (check out my repo for a sample file at the bottom of the page):

bq mk --table --schema ~/nocrm_leads.json --time_partitioning_field record_time folkloric-clock-161420:nocrm_raw.nocrm_leads

One point of interest here is the partition field tag. I load this data in batch to create a daily snapshot of how the data is at the time of ingestion, “record_time”. I needed this because one of the data consumers wanted to see the change over time of how the leads were being managed. When a table is partitioned in BigQuery, you can select specifically from a single partition. This reduces the amount of data that has to be processed and in doing so increases performance and also reduces the cost of the query.

I can do this simply by adding the partition column in to my WHERE clause like so:

SELECT * FROM `folkloric-clock-161420.nocrm_raw.nocrm_leads` 
WHERE record_time = “2019–06–27” LIMIT 1000

As a note, it is not best practice to do a SELECT * in BigQuery, only retrieve what you need as selecting everything will increase the cost of the query.

After querying and reviewing the data there was one thing I noticed that I didn’t when originally looking at the data.

I had noCRM User IDs and not the actual user information.

So I needed to refactor

Refactoring…

I refactor my code a lot, mostly because I’m learning as I go with Python. I probably would have let the code from my previous post live on as is, but because of the User ID issue above I needed more data from noCRM, the user information. This called for hitting an additional endpoint ‘users’.

The reason I chose to refactor this code specifically was a lot of it was hard coded for the leads data, and I wanted it to be more usable if I continued to have more need from noCRM. The pieces I wanted to address were:

  • Variables explicitly referencing leads
  • The JSON Parser that mapped each element of the JSON to a new row object
  • Have a straightforward way in BigQuery to get the latest partition

The updating of the variables was fairly straight forward. What I ended up doing was creating a new set of variables called endpoints and storing the name of the endpoints I want to hit there. That allowed me to just have the base_url strictly be the address for the api. Also my table names made it easy because they are nocrm_leads and nocrm_users so the endpoint variables fit right in.

Old

# noCRM vars
base_url = ‘https://pandera-systems.nocrm.io/api/v2/leads'
headers = {‘X-API-KEY’: os.environ[“nocrm_key”]}
params = {‘limit’: ‘3000’}
# gcp vars
dataset_id = ‘nocrm_raw’
table_id = ‘nocrm_leads’
record_time = datetime.now().strftime(“%Y-%m-%d”)

Refactor

# noCRM vars
base_url = ‘https://pandera-systems.nocrm.io/api/v2/'
headers = {‘X-API-KEY’: os.environ[“nocrm_key”]}
params = {‘limit’: ‘3000’}
# gcp vars
dataset_id = ‘nocrm_raw’
record_time = datetime.now().strftime(“%Y-%m-%d”)
endpoints = [‘leads’,’users’]

The JSON parser was a little bit more of a learning curve for me. As you can see I was able to stream line this section down from 39 to 13 lines. I still kept the logic to modify the “remind time” due to it not being in the correct format, but I found that if I did not modify the JSON object like I was originally doing the row would map fine to the BigQuery table, and also address the datetime issue I was having.

Old

def json_parse(jsonData):
rows = []
for line in jsonData:
if line[‘remind_time’] is not None:
remind_time = line[‘remind_time’]+”:00" .
else:
remind_time = line[‘remind_time’]
row = {
“id”: line[‘id’],
“title”: line[‘title’],
“pipeline”: line[‘pipeline’],
“step”: line[‘step’],
“step_id”: line[‘step_id’],
“status”: line[‘status’],
“amount”: line[‘amount’],
“probability”: line[‘probability’],
“currency”: line[‘currency’],
“starred”: line[‘starred’],
“remind_date”: line[‘remind_date’],
“remind_time”: remind_time,
“created_at”: line[‘created_at’],
“estimated_closing_date”: line[‘estimated_closing_date’],
“updated_at”: line[‘updated_at’],
“description”: line[‘description’],
“html_description”: line[‘html_description’],
“tags”: line[‘tags’],
“created_from”: line[‘created_from’],
“closed_at”: line[‘closed_at’],
“attachment_count”: line[‘attachment_count’],
“created_by_id”: line[‘created_by_id’],
“user_id”: line[‘user_id’],
“client_folder_id”: line[‘client_folder_id’],
“client_folder_name”: line[‘client_folder_name’],
“team_id”: line[‘team_id’],
“team_name”: line[‘team_name’],
“record_time”: record_time,
}
rows.append(row)
return(rows)

Refactor

def json_parse(jsonData):
rows = []
for row in jsonData:
if ‘remind_time’ in row.keys() and row[‘remind_time’] is not None:
row[‘remind_time’] = row[‘remind_time’] + “:00”
row.update({“record_time”: record_time})
if ‘teams’ in row.keys():
row[‘teams’] = []
rows.append(row)
print(“Success: JSON parsed “ + len(rows) + “ records in parsed data”)
return(rows)

The last portion of the refactor was to have a simpler mechanism for pulling the latest partition of BigQuery table. To do this I created a new table in BigQuery that just holds a single row of the last date the batch ingest ran.

def update_rt():
update_query = (
“UPDATE `folkloric-clock-161420.nocrm_raw.nocrm_latest_rt` “
“SET record_time = ‘“ + record_time +
“‘ WHERE 1=1”
)
query_job = client.query(update_query,location=”US”)
print(“Success: Record Time set to: “ + record_time)

Then on the BigQuery side, I have two new views v_nocrm_leads_cur and v_nocrm_users_cur with the logic of:

SELECT *
FROM `folkloric-clock-161420.nocrm_raw.nocrm_leads`
WHERE record_time IN (
SELECT record_time
from `folkloric-clock-161420.nocrm_raw.nocrm_latest_rt`
)

This allows me to only hit the one partition in the table, reducing cost implications for queries that only need to hit the latest view.

Now to focus a little more on BigQuery and some of the features that really set it apart. I am not a data scientist, I do not know how to build models, use data science Python libraries and Jupyter notebooks to create really deep insights. But with BigQuery I, and anyone who knows SQL, can.

Image result for bigquery ml

BigQuery ML lets people who know SQL build models using what they already know. No building resources to handle it, just simple SQL.

It still follows the same principles of building, training, evaluation, and predicting but it is made very simple.

The documentation for it can be found here:

https://cloud.google.com/bigquery-ml/docs/bigqueryml-intro

For an example I am going to build out a model to predict how long an open lead is going to take to close. I’ll start by creating the model, which also does the training at the same time.

CREATE MODEL `folkloric-clock-161420.nocrm_raw.days_to_close_model`
OPTIONS
(model_type=’linear_reg’,
input_label_cols=[‘days_to_close’]) AS
SELECT
days_to_close,
user_id,
team_name
FROM `folkloric-clock-161420.nocrm_raw.v_nocrm_leads_cur`
WHERE status = ‘won’
AND pipeline = 'Sales Matrix'
AND RAND() <0.1

I define the input, which is what I want to predict, in this case days_to_close, and select some features I want to incorporate into the prediction.

After it runs I get some stats about the training:

Next the model is evaluated.

SELECT *
FROM ML.EVALUATE(MODEL `folkloric-clock-161420.nocrm_raw.days_to_close_model`,
(
SELECT
days_to_close,
user_id,
team_name
FROM
`folkloric-clock-161420.nocrm_raw.v_nocrm_leads_cur`
WHERE
status = ‘won’
AND pipeline = 'Sales Matrix'))

And now I am ready to predict!

SELECT *
FROM ML.PREDICT(MODEL `folkloric-clock-161420.nocrm_raw.days_to_close_model`,
(
SELECT
Days_to_close,
user_id,
team_name
FROM
`folkloric-clock-161420.nocrm_raw.v_nocrm_leads_cur`
WHERE
status <> ‘won’
AND pipeline = 'Sales Matrix'))

Here are the results, well kind of, the marketing team did not give me approval to share the actual results days to close predictions or employee names.

After looking at the results though, I felt like there were more features I could add. To do this, all I need to do is delete the model, add a new feature to my select. In this case I am going to add in a feature that lets me know if they’ve added a reminder to the lead. The thought process here is if they are more engaged with the lead maybe it will lead to better results.

So in my original creation I’m going to add a feature of:

CASE WHEN remind_date IS NULL THEN 0 ELSE 1 END AS has_reminder

From there I’ll just repeat the process. Unfortunately the results of the evaluation were not any better, they were actually worse, but it was a good test none the less.

It is important to note that there are costs associated with using BIgQuery ML, especially with the creation and subsequent training of a model. There is a free tier though which we are under in this scenario.

You can find more of the pricing details here:

https://cloud.google.com/bigquery-ml/pricing

To recap I’ve gone through in this post I went over utilizing BigQuery to read and create views, modifying my pipeline, and then creating machine learning models in BigQuery to predict how many days it will take a lead to close.

You can check out the latest version of my code here:

https://bitbucket.org/danielzagales/nocrm_pulls/src/master/

I have done this all in a serverless architecture that was extremely low cost because I am only paying for use and not standing up or managing actual infrastructure. It is pretty exciting to see how much innovation is getting democratized to everyone. However one thing BigQuery ML does not do is improve the quality of the model, and there is still a lot of data science effort that still should be performed to understand the data and create models that are optimized and accurate. But this certainly lowers the barrier into machine learning and allows small to medium businesses have the same level of innovation as a large enterprise.

Stay tuned next time when I take the plunge into building a data pipeline in Dataflow using the Apache Beam framework to stream data from PubSub to BigQuery.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store