Building a Fitness Pipeline in Google Cloud

In my previous posts, I went over how we went about implementing a fitness program at Pandera, the need for a custom solution to handle a little friendly competition, a supporting architecture, and how I would structure a Data Vault.

Designing a Fitness Leaderboard in GCP

Implementing a Data Vault in BigQuery

Now though, we’re building the pipeline. This will take the Strava data and get it into BigQuery. As a reference point, here is the architecture I outlined in my first post:

Google Cloud Architecture

I previously mentioned my main focus in discussing the code involved in this application is going to be the Strava API call component that serves as my data pipeline.

When we start dissecting the Strava API Call in the diagram we see it handles a few things.

  1. Receiving the PubSub message and routing the next action based on the update

As I mentioned earlier the webhook doesn’t pass the actual event. What it does do is say what kind of event it was, Activity or Athlete, what the corresponding ID is, and what kind of action it is, Create, Update, Delete. Given that content I handle each message in varying ways depending on those two fields.

I parse the pubsub message out into a few objects as follows:

# event notification from strava
aspect_type = pubsub_dict['aspect_type']
object_id = pubsub_dict['object_id']
owner_id = pubsub_dict['owner_id']
object_type = pubsub_dict['object_type']
event_time = pubsub_dict['event_time']
event_datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(event_time))

In this function I only really care about activities, and from there I handle create and update messages mostly the same and deletes a bit differently:

if object_type == ‘activity’:
now = time.time()
if aspect_type == ‘delete’:
athlete_activity_dict = [{‘activity_hub_seq’: hashlib.md5(str(object_id).encode()).hexdigest()
, ‘sat_load_date’: event_datetime
, ‘delete_ind’: True}]
sat_table_ref = bq_client.dataset(‘strava_datavault’).table(‘activity_sat’)
bq_client.load_table_from_json(athlete_activity_dict, sat_table_ref)

As you can see above if the type of change is a delete, I don’t do a whole lot here other than generating a hash, and take the event time from the pubsub message and insert into the activity_sat table. The reason I do that is because I can no longer make a request of the API for that activity because it no longer exists, but it is still important to have that reference to it being deleted.

When the record is not a delete, so either update or create, we make the request out to the Strava API. I first verify that my access token is still valid by checking the current time against the expiration of the token I have in datastore. If the token is expired I get a new one, if not I proceed with the one I have. I’m using a library called stravalib to help simplify the process of interacting with Strava, I am not thrilled with how a few things are implemented in it but it sped things up a great deal. Long term I will be removing any dependencies on that library.

After I get the activity response back from Strava, I use one of stravalib’s prebuilt to_dict functions. As I mentioned there are some things I do not like with this library. one thing in particular is that the to_dict() function strips out some of the ID fields. So you’ll notice I add them back in with the supplemental variable.

Once that is cleared, I upload the dictionary to GCS. I organized files in the following way:


That way if someone wanted me to remove their data it is easy enough to do a mass delete, and if I needed to reprocess data, I could find it easily.

From there I create my rows for insert and load them into BigQuery. This really lends itself to using the Insert call instead of the load, but currently I am trying to avoid charges as much as possible.

if aspect_type != ‘delete’:
# stored athlete from datastore
athlete_key = datastore_client.key(‘Athlete’, owner_id)
athlete = datastore_client.get(athlete_key)
if now > athlete[‘expires_at’]:
access_token = strava_client.refresh_access_token(client_id=STRAVA_CLIENT_ID,
# create new client for authenticated athlete
athlete_client = Client(access_token=athlete[‘access_token’])
activity = athlete_client.get_activity(object_id)
activity_dict = activity.to_dict()
supplement = {‘athlete_id’: owner_id,
‘activity_id’: object_id,
‘load_date’: event_datetime}
# GCS Storage
upload_blob(activity_dict, owner_id, object_id, event_time, aspect_type)
converted_units = {
‘distance_mi’: unithelper.mile(getattr(activity, ‘distance’, None)).get_num(),
‘average_speed_mi’: unithelper.mph(getattr(activity, ‘average_speed’, None)).get_num(),
‘max_speed_mi’: unithelper.mph(getattr(activity, ‘max_speed’, None)).get_num(),
‘elapsed_time_s’: int(unithelper.timedelta_to_seconds(getattr(activity, ‘elapsed_time’, None))),
‘moving_time_s’: int(unithelper.timedelta_to_seconds(getattr(activity, ‘moving_time’, None)))
activity_dict.update(converted_units) athlete_activity_obj = AthleteActivity(activity_dict)
sat_athlete_activity = athlete_activity_obj.satellite()
# BQ insert
sat_table_ref = bq_client.dataset(‘strava_datavault’).table(‘activity_sat’)
bq_client.load_table_from_json(sat_athlete_activity, sat_table_ref)

One thing you’ll see in the above code is a reference to AthleteActivity. I created a class to model take in and model the data with how I would need it for my data vault. You can see how I initialize the object, then use those attributes to create satellite, hub, and link structures by calling the respective methods.

class AthleteActivity(object):
def __init__(self, params):
self.athlete_hub_seq = hashlib.md5(str(params[‘athlete_id’]).encode()).hexdigest()
self.athlete_id = params[‘athlete_id’]
self.activity_hub_seq = hashlib.md5(str(params[‘activity_id’]).encode()).hexdigest()
self.activity_id = params[‘activity_id’]
self.athlete_activity_link_seq = hashlib.md5(
(str(self.athlete_id) + str(self.activity_id)).encode()).hexdigest()
self.activity_type = params.get(‘type’, ‘None’)
self.activity_name = params.get(‘name’, None)
self.distance_m = params.get(‘distance’, None)
self.distance_mi = params.get(‘distance_mi’, None)
self.elapsed_time_s = params.get(‘elapsed_time_s’, None)
self.start_date = params.get(‘start_date’, None)
self.average_speed_m = params.get(‘average_speed’, None)
self.average_speed_mi = params.get(‘average_speed_mi’, None)
self.max_speed_m = params.get(‘max_speed’, None)
self.max_speed_mi = params.get(‘max_speed_mi’, None)
self.load_date = params.get(‘load_date’, None)
self.record_source = ‘Strava’
self.deleted_ind = False
self.activity_description = params.get(‘description’, None)
self.moving_time_s = params.get(‘moving_time_s’, None)
self.total_elevation_gain_m = float(params.get(‘total_elevation_gain’, None))
self.elev_high_m = params.get(‘elev_high’, None)
self.elev_low_m = params.get(‘elev_low’, None)
self.trainer = params.get(‘trainer’, None)
self.commute = params.get(‘commute’, None)
self.manual = params.get(‘manual’, None)
self.private = params.get(‘private’, None)
self.flagged = params.get(‘flagged’, None)
self.workout_type = params.get(‘workout_type’, None)
self.kilojoules = params.get(‘kilojoules’, None)
self.average_watts = params.get(‘average_watts’, None)
self.device_watts = params.get(‘device_watts’, None)
self.max_watts = params.get(‘max_watts’, None)
self.weighted_average_watts = params.get(‘weighted_average_watts’, None)
self.calories = params.get(‘calories’, None)
def hub(self):
hub_record = [{
‘activity_hub_seq’: self.activity_hub_seq,
‘activity_id’: self.activity_id,
‘hub_load_date’: self.load_date,
‘record_source’: self.record_source
return hub_recorddef satellite(self):
satellite_record = {
‘activity_hub_seq’: self.activity_hub_seq,
‘sat_load_date’: self.load_date,
‘activity_name’: self.activity_name,
‘activity_description’: self.activity_description,
‘distance_m’: self.distance_m,
‘distance_mi’: self.distance_mi,
‘elapsed_time_s’: self.elapsed_time_s,
‘moving_time_s’: self.moving_time_s,
‘activity_type’: self.activity_type,
‘start_date’: self.start_date,
‘average_speed_m’: self.average_speed_m,
‘average_speed_mi’: self.average_speed_mi,
‘max_speed_m’: self.max_speed_m,
‘max_speed_mi’: self.max_speed_mi,
‘total_elevation_gain_m’: self.total_elevation_gain_m,
‘elev_high_m’: self.elev_high_m,
‘elev_low_m’: self.elev_low_m,
‘trainer’: self.trainer,
‘commute’: self.commute,
‘manual’: self.manual,
‘private’: self.private,
‘flagged’: self.flagged,
‘workout_type’: self.workout_type,
‘kilojoules’: self.kilojoules,
‘average_watts’: self.average_watts,
‘device_watts’: self.device_watts,
‘max_watts’: self.max_watts,
‘weighted_average_watts’: self.weighted_average_watts,
‘calories’: self.calories,
‘record_source’: self.record_source,
‘delete_ind’: self.deleted_ind
to_be_hashed = []
ignore_list = [‘activity_hub_seq’, ‘sat_load_date’, ‘delete_ind’]
for k in list(satellite_record):
if k not in ignore_list:
if satellite_record[k] is None:
hash_string = hashlib.md5(‘’.join(to_be_hashed).encode()).hexdigest()
return [satellite_record]
def link(self):
link_record = [{
‘athlete_activity_seq’: self.athlete_activity_link_seq,
‘athlete_hub_seq’: self.athlete_hub_seq,
‘athlete_id’: self.athlete_id,
‘activity_hub_seq’: self.activity_hub_seq,
‘activity_id’: self.activity_id,
‘link_load_date’: self.load_date,
‘record_source’: self.record_source
return link_record

With the pipeline built and deployed, data is now flowing into BigQuery.

I had a few bugs crop up once it was implemented for instance activities that were not distance based coming in with a 0 for elevation gain that was causing an error in BigQuery. That field is a float in BigQuery, however the 0 was acting as an int. As always Stackdriver is a huge help in dissecting these issues.

The last thing I now owe the PFC members is a way to look at all this data. Join me next time as I run through a build out of a Data Studio dashboard.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store