Scraping, Parsing and Analysis of the Crossfit competitions
2024-04-15
Gabriel MontpetitIn this post, we will check if it's possible to find patterns within crossfit competitions to see which movements are more likely to come up together.
More information on Market Basket Analysis (MBA) can be found here: https://www.turing.com/kb/market-basket-analysis
I also tried to reverse-engineer the games rest api but couldn't find anything for the workouts. If you are interested, I was able to use those endpoints:
List of Open Workouts for 2023
Information for the opens 2023
I couldn't find the workouts endpoint so I chose to scrape the data instead.
How
- Scrape the data from games.crossfit.com with beautiful soup.
- Parse the content, extracting all the workouts.
- Fetch a list of possible exercises online from wodcat 's API.
- Fuzzy match the exercises from the workouts to the exercises from the wodcat API.
- Apply MBA with mlxtend
- Create a bar plot of movements per year
Scoping
There are a wide range of combinations, let's reduce the scope to something a bit more manageable:
- Focus on the Opens, Quarterfinals, Semifinals and Games.
- No modalities such as weight or number of repetitions
- No age groups: just Rx (men and women)
Install the libraries and Imports
%%capture
!pip install pandas==2.2.1
!pip install beautifulsoup4==4.12.3
!pip install tqdm==4.66.2
!pip install mlxtend==0.23.1
!pip install jellyfish==1.0.3
!pip install anytree==2.12.1
!pip install networkx==3.3
!pip install seaborn==0.13.2
from __future__ import annotations
import dataclasses
import json
import re
import unicodedata
from dataclasses import dataclass
from enum import Enum
from http import HTTPStatus
from typing import Any, Callable
from urllib.parse import urljoin
import jellyfish
import networkx as nx
import numpy as np
import pandas as pd
import requests
import seaborn as sns
from bs4 import BeautifulSoup, Tag, ResultSet
from mlxtend.frequent_patterns import association_rules
from mlxtend.frequent_patterns import fpgrowth
from mlxtend.preprocessing import TransactionEncoder
from networkx import connected_components
from tqdm import tqdm
Enums, Dataclasses and Variables
class Event(Enum):
OPEN = 1
QUARTERFINALS = 2
SEMIFINALS = 3
GAMES = 4
@dataclass
class Workout:
name: str
year: int
event: Event
content: list[str]
@dataclass
class BaseUrl:
base: str
prefix: str
min_year: int
max_year: int
event: Event
excluded_years: list[int] | None = None
# used to make it look like a normal browser
header = {
"User-Agent": " ".join([
"Mozilla/5.0",
"(Linux; Android 10; K)",
"AppleWebKit/537.36 (KHTML, like Gecko)",
"Chrome/114.0.0.0 Mobile Safari/537.36"
]),
"X-Requested-With": "XMLHttpRequest"
}
open_url = BaseUrl(
base="https://games.crossfit.com",
prefix="workouts/open",
min_year=2011,
max_year=2023,
event=Event.OPEN
)
quarterfinals_url = BaseUrl(
base="https://games.crossfit.com",
prefix="workouts/onlinequalifiers",
min_year=2014,
max_year=2024,
event=Event.QUARTERFINALS
)
semifinals_url = BaseUrl(
base="https://games.crossfit.com",
prefix="workouts/semifinals",
min_year=2021,
max_year=2023,
event=Event.SEMIFINALS
)
games_url = BaseUrl(
base="https://games.crossfit.com",
prefix="workouts/games",
min_year=2007,
max_year=2023,
event=Event.GAMES
)
1. Scrape the data from games.crossfit.com with beautiful soup.
In this section, we define utility methods:
extract_text
: Used to extract and standardize text fields.find_pre_2017
: Parse pre-2017 open, quarterfinals and semifinals.find_non_games_workouts
: Used for opens, quarterfinals and semifinals.find_quarter_finals_workouts
: For quarterfinals.find_semi_finals_workouts
: For semifinals.find_games_workouts
: For games.
def extract_text(results: ResultSet) -> list[str]:
"""Extract all text from a Beautifulsoup ResultSet.
Applies normalization and removes any whitespaces.
:param results: The ResultSet to extract text from.
:return: Returns the normalized text as a list.
"""
flatten = lambda nested: [item for x in nested for item in x]
is_tag = lambda t: isinstance(t, Tag)
non_empty_string = lambda s: len(s) > 0
return list(
filter(
non_empty_string,
flatten(
list(
map(
lambda y: [
unicodedata.normalize("NFKD", e).strip() for e in y.get_text().splitlines()
],
filter(
is_tag,
results
)
)
)
)
)
)
def find_pre_2017(page: BeautifulSoup, year: int, event: Event) -> list[Workout]:
"""Find all workouts from a pre-2017 competition. The pages are not formatted the same.
:param page: The BeautifulSoup instance to search the pre-2017 workouts with.
:param year: The current search year.
:param event: The type of event.
:return: The list of Workouts found.
"""
flatten = lambda nested: [item for x in nested for item in x]
workouts = []
lis = page.find_all("li", {
"id": re.compile("workoutsTab.*")
})
names = [h.get_text() for h in page.find_all("h3", class_="c-heading-data")]
for i, li in enumerate(lis):
content = []
for l in li.find('section'):
if isinstance(l, Tag):
content.append(
list(
filter(
lambda x: x and len(x) > 0,
[unicodedata.normalize("NFKD", e).strip() for e in l.get_text().split("\n")]
)
)
)
workouts.append(Workout(
name=names[i],
year=year,
event=event,
content=flatten(content)
))
return workouts
def find_non_games_workouts(base_url: BaseUrl) -> list[Workout]:
"""Find all non-games workout from a base_url.
:param base_url: The BaseUrl dataclass.
:return: The list of Workouts.
"""
workouts = []
for year in tqdm(range(base_url.min_year, base_url.max_year + 1), desc=base_url.event.name):
if year == 2022 and base_url.event == Event.SEMIFINALS:
continue
get_response = requests.get(urljoin(base_url.base, "/".join([base_url.prefix, str(year)])), headers=header)
soup = BeautifulSoup(get_response.text, features="lxml")
if year < 2017:
workouts.extend(find_pre_2017(soup, year, base_url.event))
else:
ordinals = soup.findChildren("div", class_="ordinals")
for a in ordinals[0]:
if isinstance(a, Tag):
ordinal = requests.get(urljoin(base_url.base, a["href"]), headers=header)
ordinal_soup = BeautifulSoup(ordinal.text, features="lxml")
name = ordinal_soup.find_all("div", class_="heading")[0].text
content = extract_text(ordinal_soup.find_all("div", class_="exercises")[0])
workouts.append(
Workout(
name=name,
year=year,
event=base_url.event,
content=content
)
)
return workouts
def find_games_workouts(base_url: BaseUrl) -> list[Workout]:
"""Method to find all Games workouts from a BaseUrl dataclass.
:param base_url: The BaseUrl dataclass of the games page.
:return: All the Games workout found.
"""
workouts = []
for year in tqdm(range(base_url.min_year, base_url.max_year + 1), desc=base_url.event.name):
r = requests.get(urljoin(base_url.base, "/".join([base_url.prefix, str(year)])), headers=header)
soup = BeautifulSoup(r.text, features="lxml")
events_details = soup.find_all("div", id="events-details")
names = [n.text for n in events_details[0].find_all("div", class_="events-heading")]
details = soup.find_all("div", id="events-details")[0].find_all("div", class_=["description collapse",
"description collapse in"])
for idx, workout in enumerate(details):
content = []
for elem in workout:
if isinstance(elem, Tag):
for e in elem.text.split("\n"):
if e and len(e) > 0:
content.append(unicodedata.normalize("NFKD", e))
workouts.append(Workout(
name=names[idx],
year=year,
event=Event.GAMES,
content=content
))
return workouts
2. Parse the content, extracting all the workouts.
Now that we have all our scraping methods, let's use them to find all the opens, quarter finals, semi finals and games workouts.
def find_opens_workouts(base_url: BaseUrl) -> list[Workout]:
return find_non_games_workouts(base_url)
def find_quarter_finals_workouts(base_url: BaseUrl) -> list[Workout]:
return find_non_games_workouts(base_url)
def find_semi_finals_workouts(base_url: BaseUrl) -> list[Workout]:
return find_non_games_workouts(base_url)
all_opens_workouts = find_opens_workouts(open_url)
all_quarterfinals_workouts = find_quarter_finals_workouts(quarterfinals_url)
all_semifinals_workouts = find_semi_finals_workouts(semifinals_url)
all_games_workouts = find_games_workouts(games_url)
all_workouts = all_games_workouts + all_opens_workouts + all_quarterfinals_workouts + all_semifinals_workouts
# should be 320 >
len(all_workouts)
OPEN: 100%|██████████| 13/13 [01:16<00:00, 5.85s/it]
QUARTERFINALS: 100%|██████████| 11/11 [01:20<00:00, 7.34s/it]
SEMIFINALS: 100%|██████████| 3/3 [00:18<00:00, 6.16s/it]
GAMES: 100%|██████████| 17/17 [00:15<00:00, 1.09it/s]
320
Save a copy on disk so that we don't have to re-scrape the data:
# save the workouts as json
with open("all_workouts.json", "w") as f:
json_serializable = []
for w in all_workouts:
d = dataclasses.asdict(w)
d["event"] = d["event"].name
json_serializable.append(d)
f.write(json.dumps(json_serializable, indent=4))
f.close()
# reload the saved copy
all_workouts = json.loads(open("all_workouts.json", "r").read())
# show the data
competitions_df = pd.DataFrame(all_workouts)
competitions_df.head(3)
name | year | event | content | |
---|---|---|---|---|
0 | CrossFit Total | 2007 | GAMES | [Back squat 1-rep max, Press 1-rep max, Deadli... |
1 | Hopper-Style CrossFit WOD | 2007 | GAMES | [For time:, 1,000-meter row, Then, 5 rounds of... |
2 | Trail Run | 2007 | GAMES | [Trail run (approximately 5 k)] |
3. Fetch a list of possible exercises online from wodcat 's API.
def get_exercises(start_url: str) -> list[dict[str, Any]]:
"""Recursive Method to get all exercises from the wodcat api.
:param start_url: The starting point URL.
:return: The list of execises, where each exercise is a dictionary.
"""
http_response = requests.get(start_url, headers=header)
if http_response.status_code == HTTPStatus.OK:
response_content = json.loads(http_response.text)
response_data = response_content["results"]
if response_content.get("next"):
response_data.extend(get_exercises(response_content["next"]))
return response_content["results"]
# invoke
wodcat_exercises = get_exercises("https://wodcat.com/api/w/v3/exercises/")
max_id = max([e["id"] for e in wodcat_exercises])
missing = [
{
"slug": "ground-to-overheads",
"name": "ground to overheads"
}
]
# add some missing exercises
for m in missing:
max_id += 1
wodcat_exercises.append(
{
"id": max_id,
**m
}
)
# save a copy in json format for future use
with open("exercises.json", "w") as f:
f.write(json.dumps(wodcat_exercises, indent=4))
f.close()
# show data
pd.DataFrame(wodcat_exercises).head(3)
id | slug | name | localname | modality | class_exercise | equipment_type | level | video | teaser | abbreviation | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | air-squat | Air Squat | Air Squats | G | Squats | Lung | https://www.youtube.com/embed/C_VtOYc6j5c | <p><strong>Air squat</strong>, also called a b... | ||
1 | 224 | alternating-dumbbell-snatches | Alternating dumbbell snatches | Alternating dumbbell snatches | W | Snatch | Dumbbell | Average | https://www.youtube.com/embed/KP7A3cFazOc | <p>The Dumbbell Snatch is variation of Snatch ... | |
2 | 180 | alternating-floor-press | Alternating floor press | Alternating floor press | W | Presses | Dumbbell | Average | https://www.youtube.com/embed/DJwNDf0XDys | <p>This exercise is a species of basic “... |
4. Fuzzy Match
I wasn't able to find the workouts from the games API, so I had to scrape the data instead. This came at a cost: the workouts are not as clean as I would like them to be. For example, "handstand" and "handstand walk" are not the same, but they should be. We need to find a way to match the workouts to the exercises from wodcat's API.
One way that I think will work is by fuzzy matching the workouts to the exercises. We can use the Jaro-Winkler Similarity to fuzzy match the workouts to the exercises. By "Tokenizing" the competition text into "Tokens", we will be able to compare Wodcat Tokens with Competition Tokens.
The "Tokenization" algorithm will take a minimum and maximum word count per token and create a sequence of Tokens with the min/max as the boundaries. It will also remove any capitalization, punctuation, dashes and extra whitespaces. We also assume that the words are already in the correct order (because we normally read from left to right in the west).
Example, given a minimum of 1 and a maximum of 3, the text "For time: 10 pig flips" will be tokenized into: ['for'], ['for', 'time'], ['for', 'time', '10'], ['time'], ['time', '10'], ['time', '10', 'pig'], ['10'], ['10', 'pig'], ['10', 'pig', 'flips'], ['pig'], ['pig', 'flips'], ['flips']
This is quite computationally expensive and likely would not scale well for larger text.
So we need to Tokenize both sources of data so that their tokens can be compared. For the Wodcat tokens, the minimum is equal to the maximum so that 1 Token == 1 Exercise.
Example, given the following:
Wodcat Token
Token(['pig', 'flips'])
Competition Tokens
Token(words=['for']), Token(words=['for', 'time']), Token(words=['for', 'time', '10']), Token(words=['time']), Token(words=['time', '10']), Token(words=['time', '10', 'pig']), Token(words=['10']), Token(words=['10', 'pig']), Token(words=['10', 'pig', 'flips']), Token(words=['pig']), Token(words=['pig', 'flips']), Token(words=['flips'])
would results the following Scores beign generated:
[8, 7, 10, 8, 8, 8, 9, 7, 3, 6, 0, 4]
thus, with a score of 0, we know that at index 10, we have a match for "pig flips".
In order to do that, we will define 2 classes, Token
and Text
, that will help us tokenize the workouts and the exercises.
- Token: A token is a list of words that can be compared to another token using the Jaro-Winkler Similarity distance.
- Text: A text is a string that can be tokenized into a list of tokens. The
Text
class uses theToken
class to tokenize the text.
@dataclass
class Token:
words: list[str]
def sim_score(self, other: Token) -> float:
"""Compare two tokens using the jaro_similarity which should give a number between 0 and 1 where 0 is completely different, and 1 is the same.
We make sure to remove any capital letters as well, without changing the original data.
:param other: The other token to compare with.
:return: The jaro_similarity score between the two tokens.
"""
return jellyfish.jaro_similarity(" ".join(self.words), " ".join(other.words))
@property
def word_count(self) -> int:
return len(self.words)
@property
def no_spaces_char_count(self) -> int:
return len(
"".join(self.words)
)
def __hash__(self):
return hash(" ".join(self.words))
def __str__(self):
return "_".join(self.words)
class Text:
def __init__(self, text: str):
self.text = text
self._wc = None
@property
def uncapitalize(self) -> Text:
"""Remove any capitalization from a text.
:return: The text without any capitalization.
"""
return Text(self.text.lower())
@property
def remove_punctuation(self) -> Text:
"""Remove any punctuation from a text.
:return: The text without any punctuation.
"""
return Text(re.sub(r'[^\w\s^-]', '', self.text))
@property
def remove_dashes(self) -> Text:
"""Remove any dashes from a text.
:return: The text without any dashes.
"""
return Text(re.sub(r'-', ' ', self.text))
@property
def remove_extra_whitespaces(self) -> Text:
"""Remove any extra whitespaces from a text.
:return: The text without any extra whitespaces.
"""
return Text(re.sub(r'\s+', ' ', self.text))
@property
def replace_edge_cases(self) -> Text:
"""Replace any edge cases from a text. Example: sometimes, rope climbs are written as rope ascends.
:return: The text with any edge cases replaced.
"""
replacements = {
"rope ascents": "rope climb",
"snail push": "sled push",
"muscle ups": "ring muscle ups",
"double unders": "double unders jump rope",
"pistol": "pistol squat",
"single leg squat": "pistol squat",
"echo bike": "assault air bike "
}
for k, v in replacements.items():
self.text = self.text.replace(k, v)
return Text(self.text)
def split(self, separator: str) -> list[str]:
"""Split a text into a list of words using a separator.
:param separator: The separator to split the text with.
:return: The list of words.
"""
return self.text.split(separator)
@property
def word_count(self) -> int:
return self._wc
def tokenize(self, min_word_count: int = 5, max_word_count: int = 5) -> list[Token]:
"""Tokenize a list of words into a Token. The max_word_count corresponds to the maximum number of words to keep per token.
Note that this default value is the maximum word count from wodcat's API.
We want to make sure to remove any capitalization as well.
:param min_word_count: The minimum token word count value. Example: 2 would mean that one token should take a minimum of 2 words.
:param max_word_count: The maximum token word count value. Example: 5 would be 1 token can take up to 5 words.
:return: The list of Tokens.
"""
if min_word_count > max_word_count:
raise AttributeError("min_word_count should be less than or equal to max_word_count")
words = self.uncapitalize.remove_dashes.replace_edge_cases.remove_punctuation.remove_extra_whitespaces.split(
" ")
self._wc = len(words)
if self._wc <= min_word_count:
return [Token(words)]
tokens = []
for i in range(0, len(words) - min_word_count + 1):
for j in range(min_word_count, max_word_count + 1):
if i + j > len(words):
break
tokens.append(
Token(words[i: i + j])
)
return tokens
Tokenize
Now that we have our utility classes, we can start tokenizing Wodcat exercises and the competitions exercises.
Let's start with the Wodcat:
wodcat_exercises = json.loads(open("exercises.json", "r").read())
flatten = lambda nested: [item for x in nested for item in x]
wodcat_tokens = flatten([Text(x["name"]).tokenize() for x in sorted(wodcat_exercises, key=lambda x: x["id"])])
wodcat_series = pd.Series(wodcat_tokens, name="tokens")
wodcat_series.head(3)
0 air_squat
1 back_squat
2 front_squat
Name: tokens, dtype: object
Tokenize Competitions Text
In this section, we convert all the competitions text into tokens so that we can assign scores to the Token.
First, let’s check the maximum token size for the exercises from wodcat so that we can limit the tokenization process. We don’t want to create Tokens that are 6 words long if the maximum Wodcat Tokens are 5 words long.
Then we can tokenize the competitions text.
max_word_count = max([x.word_count for x in wodcat_tokens])
tokenized_df = competitions_df.assign(**{
"tokens": competitions_df["content"].apply(lambda x: " ".join(x)).apply(
lambda z: Text(z).tokenize(min_word_count=1, max_word_count=max_word_count))
})
tokenized_df.sample(3)
name | year | event | content | tokens | |
---|---|---|---|---|---|
15 | Amanda | 2010 | GAMES | [Amanda, For time:, 9-7-5, Muscle-ups, Squat S... | [amanda, amanda_for, amanda_for_time, amanda_f... |
279 | Workout 2 | 2019 | QUARTERFINALS | [For time:, 80 bar-facing burpees, 4,000-meter... | [for, for_time, for_time_80, for_time_80_bar, ... |
106 | Assault Banger | 2017 | GAMES | [For time:, 40/30-cal. Assault Bike, 20-ft. Ba... | [for, for_time, for_time_4030, for_time_4030_c... |
Fuzzy Match Score
Now we can define a score
and a get_best_match
method that will perform the following tasks:
score
: This method uses Jaro-Winkler Similarity .get_best_match
: This method returns the best match (given a threshold) for a row. Here we retrieve the indices.
def score(c_tokens: list[Token], w_tokens: list[Token]) -> np.ndarray:
"""Score the tokens by comparing the tokens from the workouts to the tokens from wodcat.
The Token class has a built-in compare method that uses the Jaro-Winkler score.
:param c_tokens: The competition tokens.
:param w_tokens: The wodcat tokens
:return: The scores.
"""
scores = []
for c_token in c_tokens:
scores.append(
np.array([c_token.sim_score(w_token) for w_token in w_tokens])
)
return np.array(scores)
def get_best_match(scores: pd.Series, threshold: float = 0.87) -> list[Token]:
"""Get the best match for a row containing tokens.
:param scores: The column that represents the Jaro-Winkler distance scores as a 2D array.
:param threshold: The threshold to consider a match.
:return: The best match.
"""
return wodcat_series.iloc[np.where(scores > threshold)[1].tolist()].to_numpy().tolist()
tokenized_df["score"] = tokenized_df["tokens"].apply(
lambda tokens: score(tokens, wodcat_tokens)
)
tokenized_df.head(3)
name | year | event | content | tokens | score | |
---|---|---|---|---|---|---|
0 | CrossFit Total | 2007 | GAMES | [Back squat 1-rep max, Press 1-rep max, Deadli... | [back, back_squat, back_squat_1, back_squat_1_... | [[0.4537037037037037, 0.7999999999999999, 0.0,... |
1 | Hopper-Style CrossFit WOD | 2007 | GAMES | [For time:, 1,000-meter row, Then, 5 rounds of... | [for, for_time, for_time_1000, for_time_1000_m... | [[0.48148148148148145, 0.0, 0.6464646464646465... |
2 | Trail Run | 2007 | GAMES | [Trail run (approximately 5 k)] | [trail, trail_run, trail_run_approximately, tr... | [[0.5333333333333333, 0.43333333333333335, 0.3... |
Now we can retrived the best match for each Tokens.
# find the best matches
matches_df = tokenized_df.assign(**{
"best_match": tokenized_df["score"].apply(get_best_match)
})
matches_df[["name", "year", "event", "content", "best_match"]]
name | year | event | content | best_match | |
---|---|---|---|---|---|
0 | CrossFit Total | 2007 | GAMES | [Back squat 1-rep max, Press 1-rep max, Deadli... | [back_squat, back_squat, back_squat, axle_dead... |
1 | Hopper-Style CrossFit WOD | 2007 | GAMES | [For time:, 1,000-meter row, Then, 5 rounds of... | [row, pull_up, pull_up, push_jerk, jerk] |
2 | Trail Run | 2007 | GAMES | [Trail run (approximately 5 k)] | [run] |
3 | Sunday's Workout | 2008 | GAMES | [For time:, 30 Squat Clean and Jerks, (155/100... | [squat_clean, squat_clean, clean, clean_and_je... |
4 | The Hill Run | 2008 | GAMES | [The hill run is a steep, off-trail run approx... | [run, run, run, run] |
... | ... | ... | ... | ... | ... |
315 | Individual Workout 3 | 2023 | SEMIFINALS | [For time: Semifinals Linda — 10, 9, 8, 7, 6, ... | [deadlift, dumbbell_snatch, dumbbell_clean, be... |
316 | Individual Workout 4 | 2023 | SEMIFINALS | [For load:, 800-m Assault AirRunner, Max snatc... | [snatch, rest] |
317 | Individual Workout 5 | 2023 | SEMIFINALS | [For time:, 8 snatches, 800-m Assault AirRunne... | [snatch] |
318 | Individual Workout 6 | 2023 | SEMIFINALS | [For time:, 20 overhead squats, 500-m row, 3 h... | [overhead_walk, overhead_squat, overhead_squat... |
319 | Individual Workout 7 | 2023 | SEMIFINALS | [3 rounds for time of:, 15/10 Echo Bike calori... | [assault_air_bike, assault_air_bike, assault_a... |
320 rows × 5 columns
At first glance from the above results, it appears as if everything is fine. However, upon inspection, we can observe 3 things:
- Duplicates
- Duplicates with shorter tokens (example: "handstand" and "strict handstand")
- Incorrect matches (or partial matches portrayed as correct movements). For example "handstand push up" and "push ups" are not the same but because the matches are correct for "push ups", it's considered a match.
Simple Solution So to fix that problem we can use a simple solution:
For problem nb. 1: Transform the lists to sets (Token
has the __hash__
method implemented).
For problems nb. 2 and 3: Consider only the longer worded Token when a Token's words are contained within another Token. Caveat: we may actually drop valid exercises. So in order to do this we could:
- sort the list of Token by ascending size
- generate a "dynamic" regex with the head of the array and compare it to the subsequent tail
- use a
networkx
to find all the subgraphs and only keep the maximum token size
Let's add this functionality
Deduplication and Clustering
def compare_tokens(left: Token, right: Token, min_overlap: int = 4) -> int:
"""Compare two tokens, a left and a right and find the identical sequential characters.
The left token **must** always have a smaller character length than the right token.
:param left: The left token (smaller than right)
:param right: The right token.
:param min_overlap: The minimum number of characters that must be overlapping to consider the tokens identical.
:return: The score represented as the number of sequentially overlapping characters.
"""
if left.no_spaces_char_count > right.no_spaces_char_count:
assert AttributeError("Left length must be smaller than Right length.")
left_string = "".join(left.words)
right_string = "".join(right.words)
dynamic_regex = []
for i in range(min_overlap, len(left_string) + 1):
dynamic_regex.append(
left_string[0:i]
)
regex = f"(?=.*)({'|'.join(reversed(dynamic_regex))})+(?=.*)"
search = re.findall(regex, right_string)
if len(search) > 0:
return len(max(search))
return 0
def cluster_tokens(tokens: list[Token], threshold: int = 4) -> list[Token]:
"""Cluster all the tokens that have a minimum sequentially overlapping number of characters.
We then pick the Token that has the biggest character count.
:param tokens: The list of Tokens.
:param threshold: The minimum overlapping threshold.
:return: A list of Tokens, where each token represents the clustered Tokens.
Example: 'pull up' and 'chest to bar pull ups' are grouped into 'chest to bar pull ups'
because they have a minimum of 4 overlapping sequential characters.
"""
# sort by token character count, excluding white spaces
sorted_tokens = sorted(tokens, key=lambda x: x.no_spaces_char_count)
graph = nx.Graph()
for i in range(0, len(sorted_tokens)):
for j in range(i + 1, len(sorted_tokens)):
sim = compare_tokens(sorted_tokens[i], sorted_tokens[j], threshold)
if sim >= threshold:
graph.add_edge(sorted_tokens[j], sorted_tokens[i])
if not graph.has_node(sorted_tokens[i]):
graph.add_edge(sorted_tokens[i], sorted_tokens[i])
return [max(set(graph.subgraph(c)), key=lambda x: x.no_spaces_char_count) for c in connected_components(graph)]
df = matches_df[["name", "year", "event", "best_match"]].assign(**{
"clusters": matches_df["best_match"].apply(cluster_tokens),
"basket": lambda row: row.apply(
lambda x: f'{x["name"]}_{x["year"]}', axis=1
)
}).assign(**{
"target": lambda row: row["clusters"].apply(
lambda x: [str(token) for token in x]
)
})
df[["basket", "clusters", "target"]].to_json("./baskets.json", orient="records", indent=4)
df.head(3)
name | year | event | best_match | clusters | basket | target | |
---|---|---|---|---|---|---|---|
0 | CrossFit Total | 2007 | GAMES | [back_squat, back_squat, back_squat, axle_dead... | [axle_deadlift, back_squat] | CrossFit Total_2007 | [axle_deadlift, back_squat] |
1 | Hopper-Style CrossFit WOD | 2007 | GAMES | [row, pull_up, pull_up, push_jerk, jerk] | [row, push_jerk, pull_up] | Hopper-Style CrossFit WOD_2007 | [row, push_jerk, pull_up] |
2 | Trail Run | 2007 | GAMES | [run] | [run] | Trail Run_2007 | [run] |
5. Apply MBA with mlxtend
Now we can use the dataset to find any patterns inside the data (see mlxtend )
First we create a utility method so that we can re-use it with different parameters:
def mba(
source_df: pd.DataFrame,
target_column: str,
agg_cols: list[str],
min_support: float = 0.5,
metric: str = "confidence",
min_threshold: float = 0.3,
custom_filter: Callable[[pd.DataFrame], pd.DataFrame] | None = None
) -> pd.DataFrame:
"""Create an Association Rules dataframe with a source_df.
:param source_df: The input
:param target_column: The items column.
:param agg_cols: The aggregation column (ie. the basket).
:param min_support: The minimum support threshold: mlxtend is set to 0.5 by default.
:param metric: 'support', 'confidence', 'lift', 'leverage', 'conviction' and 'zhangs_metric'. This parameter is tied min_threshold.
:param min_threshold: The minimum threshold value to keep for the metric
:param custom_filter: Extra filter to add to the Association Rules results.
:return: The Association Rules dataframe.
"""
agg_df = source_df.groupby(agg_cols).agg({
target_column: "sum"
}).assign(**{
target_column: lambda row: row[target_column].apply(
lambda x: list(set(x))
)
})
encoder = TransactionEncoder()
dataset = agg_df[target_column].to_numpy()
te_ary = encoder.fit(dataset).transform(dataset)
freq_itemsets = pd.DataFrame(te_ary, columns=encoder.columns_)
fp = fpgrowth(freq_itemsets, min_support=min_support, use_colnames=True)
if len(fp) == 0:
raise ValueError("fpgrowth results empty. Try changing the min_support param ?")
ar = association_rules(fp, metric=metric, min_threshold=min_threshold)
if custom_filter:
return ar.loc[custom_filter]
return ar
MBA #1
Now we can check if there are any patterns with the exercises:
mba(df, "target", ["basket"], min_support=0.1)
antecedents | consequents | antecedent support | consequent support | support | confidence | lift | leverage | conviction | zhangs_metric | |
---|---|---|---|---|---|---|---|---|---|---|
0 | (ring_muscle_up) | (muscle_up_strict) | 0.138710 | 0.135484 | 0.129032 | 0.930233 | 6.866002 | 0.110239 | 12.391398 | 0.991948 |
1 | (muscle_up_strict) | (ring_muscle_up) | 0.135484 | 0.138710 | 0.129032 | 0.952381 | 6.866002 | 0.110239 | 18.087097 | 0.988246 |
Interpretation
Given a minimum support of 0.1 and by using the confidence metric, we can see that the only 2 exercises that pop up are muscle_up_strict and ring_muscle_up. For example, when the starting point is a "muscle_up_strict", we have a confidence of 95% of "ring_muscle_up" beign the next exercise. However when we also consider the consequent support of 0.13, it is not very conclusive.
Next Steps
Let's try at least to push it a little bit more so we can have more insights.
Decrease Granularity
So far, we tried to find some trends at the Workout level, maybe we can do that, but per year and event type instead.
MBA #2
mba(df, "target", ["year", "event"], custom_filter=lambda x: (x["support"] * x["confidence"]) > 0.65)
antecedents | consequents | antecedent support | consequent support | support | confidence | lift | leverage | conviction | zhangs_metric | |
---|---|---|---|---|---|---|---|---|---|---|
228 | (ring_muscle_up) | (muscle_up_strict) | 0.860465 | 0.837209 | 0.813953 | 0.945946 | 1.129880 | 0.093564 | 3.011628 | 0.823810 |
229 | (muscle_up_strict) | (ring_muscle_up) | 0.837209 | 0.860465 | 0.813953 | 0.972222 | 1.129880 | 0.093564 | 5.023256 | 0.706122 |
230 | (ring_muscle_up) | (toes_to_bar) | 0.860465 | 0.813953 | 0.767442 | 0.891892 | 1.095753 | 0.067063 | 1.720930 | 0.626263 |
231 | (toes_to_bar) | (ring_muscle_up) | 0.813953 | 0.860465 | 0.767442 | 0.942857 | 1.095753 | 0.067063 | 2.441860 | 0.469697 |
232 | (muscle_up_strict) | (toes_to_bar) | 0.837209 | 0.813953 | 0.744186 | 0.888889 | 1.092063 | 0.062737 | 1.674419 | 0.517857 |
233 | (toes_to_bar) | (muscle_up_strict) | 0.813953 | 0.837209 | 0.744186 | 0.914286 | 1.092063 | 0.062737 | 1.899225 | 0.453125 |
235 | (ring_muscle_up, toes_to_bar) | (muscle_up_strict) | 0.767442 | 0.837209 | 0.720930 | 0.939394 | 1.122054 | 0.078421 | 2.686047 | 0.467742 |
236 | (muscle_up_strict, toes_to_bar) | (ring_muscle_up) | 0.744186 | 0.860465 | 0.720930 | 0.968750 | 1.125845 | 0.080584 | 4.465116 | 0.436950 |
241 | (double_unders_jump_rope_jump) | (ring_muscle_up) | 0.813953 | 0.860465 | 0.744186 | 0.914286 | 1.062548 | 0.043807 | 1.627907 | 0.316406 |
247 | (ring_muscle_up, double_unders_jump_rope_jump) | (toes_to_bar) | 0.744186 | 0.813953 | 0.697674 | 0.937500 | 1.151786 | 0.091942 | 2.976744 | 0.515152 |
248 | (toes_to_bar, double_unders_jump_rope_jump) | (ring_muscle_up) | 0.720930 | 0.860465 | 0.697674 | 0.967742 | 1.124673 | 0.077339 | 4.325581 | 0.397222 |
253 | (ring_muscle_up, double_unders_jump_rope_jump) | (muscle_up_strict) | 0.744186 | 0.837209 | 0.720930 | 0.968750 | 1.157118 | 0.097891 | 5.209302 | 0.530792 |
254 | (muscle_up_strict, double_unders_jump_rope_jump) | (ring_muscle_up) | 0.720930 | 0.860465 | 0.720930 | 1.000000 | 1.162162 | 0.100595 | inf | 0.500000 |
266 | (ring_muscle_up, toes_to_bar, double_unders_ju... | (muscle_up_strict) | 0.697674 | 0.837209 | 0.674419 | 0.966667 | 1.154630 | 0.090319 | 4.883721 | 0.442971 |
267 | (muscle_up_strict, toes_to_bar, double_unders_... | (ring_muscle_up) | 0.674419 | 0.860465 | 0.674419 | 1.000000 | 1.162162 | 0.094105 | inf | 0.428571 |
Okay, now we kinda got something but it's not very conclusive. When considering both support and confidence (ie. support * confidence > 0.65), we can see that when toes to bar come up (antecedents), they also likely come up with ring muscle up (consequents).
Likewise, when there are double unders and ring muscle up (antecedents), it seem toes to bar also come up (consequents). Looks like nuking the forearms is somewhat consistent.
Let's just check one last thing: what are the most common exercises by count ?
6. Create a bar plot of movements per year
Plotting Utils
def count_movements(
df: pd.DataFrame,
aggregation: list[str],
filter_: Callable = lambda data: (data["target"] != "rest") & (data["counts"] > 1)
) -> pd.DataFrame:
"""Utility method that creates the movement counts per aggregation, for plotting.
:param df: The dataframe to create the plotting data with.
:param aggregation: The aggregation columns.
:param filter_: A custom filter to apply to the data.
:return: The dataframe ready for plotting.
"""
exploded = df.explode("target")[["name", "year", "event", "target"]]
total = exploded[["target"]].value_counts().reset_index(name="total")
counts = (
exploded
.groupby(aggregation)
.agg({
"target": "count",
})
.rename(columns={"target": "counts"})
.reset_index()
)
return pd.merge(
counts, total, on=["target"]
).assign(**{
"percent": lambda x: round((x["counts"] / x["total"]) * 100).astype(int)
}).sort_values(by="total", ascending=False).loc[filter_]
All Competitions, Years and Movements
# distinctive colors
color_list = [
"#000000", "#00FF00", "#0000FF", "#FF0000", "#01FFFE", "#FFA6FE", "#FFDB66", "#006401", "#010067", "#95003A",
"#007DB5", "#FF00F6", "#FFEEE8", "#774D00", "#90FB92", "#0076FF",
]
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
mby = count_movements(df, ["target", "year"])
mby["cumsum"] = mby.sort_values(by=["year"]).groupby(["target"])["counts"].transform(pd.Series.cumsum)
plt.subplots(figsize=(6, 15))
plt.style.use("dark_background")
sns.despine(left=True, bottom=True)
legends = []
for year in reversed(sorted(set(mby["year"].to_numpy()))):
color = color_list[year - 2008]
ax = sns.barplot(x="cumsum", y="target", data=mby[mby["year"] == year], estimator=sum, errorbar=None, color=color)
ax.set(xlabel="Counts", ylabel="Movements")
legends.append(mpatches.Patch(color=color, label=f"{year}"))
plt.legend(handles=legends)
plt.show()