Scraping, Parsing and Analysis of the Crossfit competitions

a cartoonized close up picture of a man

2024-04-15

Gabriel Montpetit

In this post, we will check if it's possible to find patterns within crossfit competitions to see which movements are more likely to come up together.

More information on Market Basket Analysis (MBA) can be found here: https://www.turing.com/kb/market-basket-analysis

I also tried to reverse-engineer the games rest api but couldn't find anything for the workouts. If you are interested, I was able to use those endpoints:

Leaderboard

List of Open Workouts for 2023

Information for the opens 2023

I couldn't find the workouts endpoint so I chose to scrape the data instead.

  • Scrape the data from games.crossfit.com with beautiful soup.
  • Parse the content, extracting all the workouts.
  • Fetch a list of possible exercises online from wodcat 's API.
  • Fuzzy match the exercises from the workouts to the exercises from the wodcat API.
  • Apply MBA with mlxtend
  • Create a bar plot of movements per year

There are a wide range of combinations, let's reduce the scope to something a bit more manageable:

  • Focus on the Opens, Quarterfinals, Semifinals and Games.
  • No modalities such as weight or number of repetitions
  • No age groups: just Rx (men and women)
                  
%%capture
!pip install pandas==2.2.1
!pip install beautifulsoup4==4.12.3
!pip install tqdm==4.66.2
!pip install mlxtend==0.23.1
!pip install jellyfish==1.0.3
!pip install anytree==2.12.1
!pip install networkx==3.3
!pip install seaborn==0.13.2
                
                  
from __future__ import annotations

import dataclasses
import json
import re
import unicodedata
from dataclasses import dataclass
from enum import Enum
from http import HTTPStatus
from typing import Any, Callable
from urllib.parse import urljoin

import jellyfish
import networkx as nx
import numpy as np
import pandas as pd
import requests
import seaborn as sns
from bs4 import BeautifulSoup, Tag, ResultSet
from mlxtend.frequent_patterns import association_rules
from mlxtend.frequent_patterns import fpgrowth
from mlxtend.preprocessing import TransactionEncoder
from networkx import connected_components
from tqdm import tqdm
                
                  
class Event(Enum):
    OPEN = 1
    QUARTERFINALS = 2
    SEMIFINALS = 3
    GAMES = 4


@dataclass
class Workout:
    name: str
    year: int
    event: Event
    content: list[str]


@dataclass
class BaseUrl:
    base: str
    prefix: str
    min_year: int
    max_year: int
    event: Event
    excluded_years: list[int] | None = None


# used to make it look like a normal browser
header = {
    "User-Agent": " ".join([
        "Mozilla/5.0",
        "(Linux; Android 10; K)",
        "AppleWebKit/537.36 (KHTML, like Gecko)",
        "Chrome/114.0.0.0 Mobile Safari/537.36"
    ]),
    "X-Requested-With": "XMLHttpRequest"
}

open_url = BaseUrl(
    base="https://games.crossfit.com",
    prefix="workouts/open",
    min_year=2011,
    max_year=2023,
    event=Event.OPEN
)
quarterfinals_url = BaseUrl(
    base="https://games.crossfit.com",
    prefix="workouts/onlinequalifiers",
    min_year=2014,
    max_year=2024,
    event=Event.QUARTERFINALS
)
semifinals_url = BaseUrl(
    base="https://games.crossfit.com",
    prefix="workouts/semifinals",
    min_year=2021,
    max_year=2023,
    event=Event.SEMIFINALS
)
games_url = BaseUrl(
    base="https://games.crossfit.com",
    prefix="workouts/games",
    min_year=2007,
    max_year=2023,
    event=Event.GAMES
)
                

In this section, we define utility methods:

  • extract_text: Used to extract and standardize text fields.
  • find_pre_2017: Parse pre-2017 open, quarterfinals and semifinals.
  • find_non_games_workouts: Used for opens, quarterfinals and semifinals.
  • find_quarter_finals_workouts: For quarterfinals.
  • find_semi_finals_workouts: For semifinals.
  • find_games_workouts: For games.
                  
def extract_text(results: ResultSet) -> list[str]:
    """Extract all text from a Beautifulsoup ResultSet.
    Applies normalization and removes any whitespaces.
    :param results: The ResultSet to extract text from.
    :return: Returns the normalized text as a list.
    """
    flatten = lambda nested: [item for x in nested for item in x]
    is_tag = lambda t: isinstance(t, Tag)
    non_empty_string = lambda s: len(s) > 0
    return list(
        filter(
            non_empty_string,
            flatten(
                list(
                    map(
                        lambda y: [
                            unicodedata.normalize("NFKD", e).strip() for e in y.get_text().splitlines()
                        ],
                        filter(
                            is_tag,
                            results
                        )
                    )
                )
            )
        )
    )
                
                  
def find_pre_2017(page: BeautifulSoup, year: int, event: Event) -> list[Workout]:
    """Find all workouts from a pre-2017 competition. The pages are not formatted the same.
    :param page: The BeautifulSoup instance to search the pre-2017 workouts with.
    :param year: The current search year.
    :param event: The type of event.
    :return: The list of Workouts found.
    """
    flatten = lambda nested: [item for x in nested for item in x]
    workouts = []
    lis = page.find_all("li", {
        "id": re.compile("workoutsTab.*")
    })
    names = [h.get_text() for h in page.find_all("h3", class_="c-heading-data")]
    for i, li in enumerate(lis):
        content = []
        for l in li.find('section'):
            if isinstance(l, Tag):
                content.append(
                    list(
                        filter(
                            lambda x: x and len(x) > 0,
                            [unicodedata.normalize("NFKD", e).strip() for e in l.get_text().split("\n")]
                        )
                    )
                )
        workouts.append(Workout(
            name=names[i],
            year=year,
            event=event,
            content=flatten(content)
        ))
    return workouts
                
                  
def find_non_games_workouts(base_url: BaseUrl) -> list[Workout]:
    """Find all non-games workout from a base_url.
    :param base_url: The BaseUrl dataclass.
    :return: The list of Workouts.
    """
    workouts = []
    for year in tqdm(range(base_url.min_year, base_url.max_year + 1), desc=base_url.event.name):
        if year == 2022 and base_url.event == Event.SEMIFINALS:
            continue
        get_response = requests.get(urljoin(base_url.base, "/".join([base_url.prefix, str(year)])), headers=header)
        soup = BeautifulSoup(get_response.text, features="lxml")
        if year < 2017:
            workouts.extend(find_pre_2017(soup, year, base_url.event))
        else:
            ordinals = soup.findChildren("div", class_="ordinals")
            for a in ordinals[0]:
                if isinstance(a, Tag):
                    ordinal = requests.get(urljoin(base_url.base, a["href"]), headers=header)
                    ordinal_soup = BeautifulSoup(ordinal.text, features="lxml")
                    name = ordinal_soup.find_all("div", class_="heading")[0].text
                    content = extract_text(ordinal_soup.find_all("div", class_="exercises")[0])
                    workouts.append(
                        Workout(
                            name=name,
                            year=year,
                            event=base_url.event,
                            content=content
                        )
                    )
    return workouts
                
                  
def find_games_workouts(base_url: BaseUrl) -> list[Workout]:
    """Method to find all Games workouts from a BaseUrl dataclass.
    :param base_url: The BaseUrl dataclass of the games page.
    :return: All the Games workout found.
    """
    workouts = []
    for year in tqdm(range(base_url.min_year, base_url.max_year + 1), desc=base_url.event.name):
        r = requests.get(urljoin(base_url.base, "/".join([base_url.prefix, str(year)])), headers=header)
        soup = BeautifulSoup(r.text, features="lxml")
        events_details = soup.find_all("div", id="events-details")
        names = [n.text for n in events_details[0].find_all("div", class_="events-heading")]
        details = soup.find_all("div", id="events-details")[0].find_all("div", class_=["description collapse",
                                                                                       "description collapse in"])
        for idx, workout in enumerate(details):
            content = []
            for elem in workout:
                if isinstance(elem, Tag):
                    for e in elem.text.split("\n"):
                        if e and len(e) > 0:
                            content.append(unicodedata.normalize("NFKD", e))
            workouts.append(Workout(
                name=names[idx],
                year=year,
                event=Event.GAMES,
                content=content
            ))
    return workouts
                

Now that we have all our scraping methods, let's use them to find all the opens, quarter finals, semi finals and games workouts.

                  
def find_opens_workouts(base_url: BaseUrl) -> list[Workout]:
    return find_non_games_workouts(base_url)


def find_quarter_finals_workouts(base_url: BaseUrl) -> list[Workout]:
    return find_non_games_workouts(base_url)


def find_semi_finals_workouts(base_url: BaseUrl) -> list[Workout]:
    return find_non_games_workouts(base_url)


all_opens_workouts = find_opens_workouts(open_url)
all_quarterfinals_workouts = find_quarter_finals_workouts(quarterfinals_url)
all_semifinals_workouts = find_semi_finals_workouts(semifinals_url)
all_games_workouts = find_games_workouts(games_url)
all_workouts = all_games_workouts + all_opens_workouts + all_quarterfinals_workouts + all_semifinals_workouts
# should be 320 >
len(all_workouts)
                
                  
OPEN: 100%|██████████| 13/13 [01:16<00:00,  5.85s/it]
QUARTERFINALS: 100%|██████████| 11/11 [01:20<00:00,  7.34s/it]
SEMIFINALS: 100%|██████████| 3/3 [00:18<00:00,  6.16s/it]
GAMES: 100%|██████████| 17/17 [00:15<00:00,  1.09it/s]





320
                

Save a copy on disk so that we don't have to re-scrape the data:

                  
# save the workouts as json
with open("all_workouts.json", "w") as f:
    json_serializable = []
    for w in all_workouts:
        d = dataclasses.asdict(w)
        d["event"] = d["event"].name
        json_serializable.append(d)
    f.write(json.dumps(json_serializable, indent=4))
    f.close()
                
                  
# reload the saved copy
all_workouts = json.loads(open("all_workouts.json", "r").read())
# show the data
competitions_df = pd.DataFrame(all_workouts)
competitions_df.head(3)
                

name year event content
0 CrossFit Total 2007 GAMES [Back squat 1-rep max, Press 1-rep max, Deadli...
1 Hopper-Style CrossFit WOD 2007 GAMES [For time:, 1,000-meter row, Then, 5 rounds of...
2 Trail Run 2007 GAMES [Trail run (approximately 5 k)]
                  
def get_exercises(start_url: str) -> list[dict[str, Any]]:
    """Recursive Method to get all exercises from the wodcat api.
    :param start_url: The starting point URL.
    :return: The list of execises, where each exercise is a dictionary.
    """
    http_response = requests.get(start_url, headers=header)
    if http_response.status_code == HTTPStatus.OK:
        response_content = json.loads(http_response.text)
        response_data = response_content["results"]
        if response_content.get("next"):
            response_data.extend(get_exercises(response_content["next"]))
        return response_content["results"]


# invoke
wodcat_exercises = get_exercises("https://wodcat.com/api/w/v3/exercises/")

max_id = max([e["id"] for e in wodcat_exercises])
missing = [
    {
        "slug": "ground-to-overheads",
        "name": "ground to overheads"
    }
]

# add some missing exercises
for m in missing:
    max_id += 1
    wodcat_exercises.append(
        {
            "id": max_id,
            **m
        }
    )

# save a copy in json format for future use
with open("exercises.json", "w") as f:
    f.write(json.dumps(wodcat_exercises, indent=4))
    f.close()

# show data
pd.DataFrame(wodcat_exercises).head(3)
                

id slug name localname modality class_exercise equipment_type level video teaser abbreviation
0 2 air-squat Air Squat Air Squats G Squats Lung https://www.youtube.com/embed/C_VtOYc6j5c <p><strong>Air squat</strong>, also called a b...
1 224 alternating-dumbbell-snatches Alternating dumbbell snatches Alternating dumbbell snatches W Snatch Dumbbell Average https://www.youtube.com/embed/KP7A3cFazOc <p>The Dumbbell Snatch is variation of Snatch ...
2 180 alternating-floor-press Alternating floor press Alternating floor press W Presses Dumbbell Average https://www.youtube.com/embed/DJwNDf0XDys <p>This exercise is a species of basic &ldquo;...

I wasn't able to find the workouts from the games API, so I had to scrape the data instead. This came at a cost: the workouts are not as clean as I would like them to be. For example, "handstand" and "handstand walk" are not the same, but they should be. We need to find a way to match the workouts to the exercises from wodcat's API.

One way that I think will work is by fuzzy matching the workouts to the exercises. We can use the Jaro-Winkler Similarity to fuzzy match the workouts to the exercises. By "Tokenizing" the competition text into "Tokens", we will be able to compare Wodcat Tokens with Competition Tokens.

The "Tokenization" algorithm will take a minimum and maximum word count per token and create a sequence of Tokens with the min/max as the boundaries. It will also remove any capitalization, punctuation, dashes and extra whitespaces. We also assume that the words are already in the correct order (because we normally read from left to right in the west).

Example, given a minimum of 1 and a maximum of 3, the text "For time: 10 pig flips" will be tokenized into: ['for'], ['for', 'time'], ['for', 'time', '10'], ['time'], ['time', '10'], ['time', '10', 'pig'], ['10'], ['10', 'pig'], ['10', 'pig', 'flips'], ['pig'], ['pig', 'flips'], ['flips']

This is quite computationally expensive and likely would not scale well for larger text.

So we need to Tokenize both sources of data so that their tokens can be compared. For the Wodcat tokens, the minimum is equal to the maximum so that 1 Token == 1 Exercise.

Example, given the following:

Wodcat Token

Token(['pig', 'flips'])

Competition Tokens

Token(words=['for']), Token(words=['for', 'time']), Token(words=['for', 'time', '10']), Token(words=['time']), Token(words=['time', '10']), Token(words=['time', '10', 'pig']), Token(words=['10']), Token(words=['10', 'pig']), Token(words=['10', 'pig', 'flips']), Token(words=['pig']), Token(words=['pig', 'flips']), Token(words=['flips'])

would results the following Scores beign generated:

[8, 7, 10, 8, 8, 8, 9, 7, 3, 6, 0, 4]

thus, with a score of 0, we know that at index 10, we have a match for "pig flips".

In order to do that, we will define 2 classes, Token and Text, that will help us tokenize the workouts and the exercises.

  • Token: A token is a list of words that can be compared to another token using the Jaro-Winkler Similarity distance.
  • Text: A text is a string that can be tokenized into a list of tokens. The Text class uses the Token class to tokenize the text.
                  
@dataclass
class Token:
    words: list[str]

    def sim_score(self, other: Token) -> float:
        """Compare two tokens using the jaro_similarity which should give a number between 0 and 1 where 0 is completely different, and 1 is the same.
        We make sure to remove any capital letters as well, without changing the original data.
        :param other: The other token to compare with.
        :return: The jaro_similarity score between the two tokens.
        """
        return jellyfish.jaro_similarity(" ".join(self.words), " ".join(other.words))

    @property
    def word_count(self) -> int:
        return len(self.words)

    @property
    def no_spaces_char_count(self) -> int:
        return len(
            "".join(self.words)
        )

    def __hash__(self):
        return hash(" ".join(self.words))

    def __str__(self):
        return "_".join(self.words)


class Text:

    def __init__(self, text: str):
        self.text = text
        self._wc = None

    @property
    def uncapitalize(self) -> Text:
        """Remove any capitalization from a text.
        :return: The text without any capitalization.
        """
        return Text(self.text.lower())

    @property
    def remove_punctuation(self) -> Text:
        """Remove any punctuation from a text.
        :return: The text without any punctuation.
        """
        return Text(re.sub(r'[^\w\s^-]', '', self.text))

    @property
    def remove_dashes(self) -> Text:
        """Remove any dashes from a text.
        :return: The text without any dashes.
        """
        return Text(re.sub(r'-', ' ', self.text))

    @property
    def remove_extra_whitespaces(self) -> Text:
        """Remove any extra whitespaces from a text.
        :return: The text without any extra whitespaces.
        """
        return Text(re.sub(r'\s+', ' ', self.text))

    @property
    def replace_edge_cases(self) -> Text:
        """Replace any edge cases from a text. Example: sometimes, rope climbs are written as rope ascends.
        :return: The text with any edge cases replaced.
        """
        replacements = {
            "rope ascents": "rope climb",
            "snail push": "sled push",
            "muscle ups": "ring muscle ups",
            "double unders": "double unders jump rope",
            "pistol": "pistol squat",
            "single leg squat": "pistol squat",
            "echo bike": "assault air bike "
        }
        for k, v in replacements.items():
            self.text = self.text.replace(k, v)
        return Text(self.text)

    def split(self, separator: str) -> list[str]:
        """Split a text into a list of words using a separator.
        :param separator: The separator to split the text with.
        :return: The list of words.
        """
        return self.text.split(separator)

    @property
    def word_count(self) -> int:
        return self._wc

    def tokenize(self, min_word_count: int = 5, max_word_count: int = 5) -> list[Token]:
        """Tokenize a list of words into a Token. The max_word_count corresponds to the maximum number of words to keep per token.
        Note that this default value is the maximum word count from wodcat's API.
        We want to make sure to remove any capitalization as well.
        :param min_word_count: The minimum token word count value. Example: 2 would mean that one token should take a minimum of 2 words.
        :param max_word_count: The maximum token word count value. Example: 5 would be 1 token can take up to 5 words.
        :return: The list of Tokens.
        """
        if min_word_count > max_word_count:
            raise AttributeError("min_word_count should be less than or equal to max_word_count")

        words = self.uncapitalize.remove_dashes.replace_edge_cases.remove_punctuation.remove_extra_whitespaces.split(
            " ")
        self._wc = len(words)

        if self._wc <= min_word_count:
            return [Token(words)]
        tokens = []

        for i in range(0, len(words) - min_word_count + 1):
            for j in range(min_word_count, max_word_count + 1):
                if i + j > len(words):
                    break
                tokens.append(
                    Token(words[i: i + j])
                )
        return tokens

                

Tokenize

Now that we have our utility classes, we can start tokenizing Wodcat exercises and the competitions exercises.

Let's start with the Wodcat:

                  
wodcat_exercises = json.loads(open("exercises.json", "r").read())

flatten = lambda nested: [item for x in nested for item in x]
wodcat_tokens = flatten([Text(x["name"]).tokenize() for x in sorted(wodcat_exercises, key=lambda x: x["id"])])
wodcat_series = pd.Series(wodcat_tokens, name="tokens")
wodcat_series.head(3)
                
                  
0      air_squat
1     back_squat
2    front_squat
Name: tokens, dtype: object
                

Tokenize Competitions Text

In this section, we convert all the competitions text into tokens so that we can assign scores to the Token.

First, let’s check the maximum token size for the exercises from wodcat so that we can limit the tokenization process. We don’t want to create Tokens that are 6 words long if the maximum Wodcat Tokens are 5 words long.

Then we can tokenize the competitions text.

                  
max_word_count = max([x.word_count for x in wodcat_tokens])
tokenized_df = competitions_df.assign(**{
    "tokens": competitions_df["content"].apply(lambda x: " ".join(x)).apply(
        lambda z: Text(z).tokenize(min_word_count=1, max_word_count=max_word_count))
})
tokenized_df.sample(3)
                

name year event content tokens
15 Amanda 2010 GAMES [Amanda, For time:, 9-7-5, Muscle-ups, Squat S... [amanda, amanda_for, amanda_for_time, amanda_f...
279 Workout 2 2019 QUARTERFINALS [For time:, 80 bar-facing burpees, 4,000-meter... [for, for_time, for_time_80, for_time_80_bar, ...
106 Assault Banger 2017 GAMES [For time:, 40/30-cal. Assault Bike, 20-ft. Ba... [for, for_time, for_time_4030, for_time_4030_c...

Fuzzy Match Score

Now we can define a score and a get_best_match method that will perform the following tasks:

  • score: This method uses Jaro-Winkler Similarity .
  • get_best_match: This method returns the best match (given a threshold) for a row. Here we retrieve the indices.
                  
def score(c_tokens: list[Token], w_tokens: list[Token]) -> np.ndarray:
    """Score the tokens by comparing the tokens from the workouts to the tokens from wodcat.
    The Token class has a built-in compare method that uses the Jaro-Winkler score.
    :param c_tokens: The competition tokens.
    :param w_tokens: The wodcat tokens
    :return: The scores.
    """
    scores = []
    for c_token in c_tokens:
        scores.append(
            np.array([c_token.sim_score(w_token) for w_token in w_tokens])
        )
    return np.array(scores)


def get_best_match(scores: pd.Series, threshold: float = 0.87) -> list[Token]:
    """Get the best match for a row containing tokens.
    :param scores: The column that represents the Jaro-Winkler distance scores as a 2D array.
    :param threshold: The threshold to consider a match.
    :return: The best match.
    """
    return wodcat_series.iloc[np.where(scores > threshold)[1].tolist()].to_numpy().tolist()


tokenized_df["score"] = tokenized_df["tokens"].apply(
    lambda tokens: score(tokens, wodcat_tokens)
)
tokenized_df.head(3)
                

name year event content tokens score
0 CrossFit Total 2007 GAMES [Back squat 1-rep max, Press 1-rep max, Deadli... [back, back_squat, back_squat_1, back_squat_1_... [[0.4537037037037037, 0.7999999999999999, 0.0,...
1 Hopper-Style CrossFit WOD 2007 GAMES [For time:, 1,000-meter row, Then, 5 rounds of... [for, for_time, for_time_1000, for_time_1000_m... [[0.48148148148148145, 0.0, 0.6464646464646465...
2 Trail Run 2007 GAMES [Trail run (approximately 5 k)] [trail, trail_run, trail_run_approximately, tr... [[0.5333333333333333, 0.43333333333333335, 0.3...

Now we can retrived the best match for each Tokens.

                  
# find the best matches
matches_df = tokenized_df.assign(**{
    "best_match": tokenized_df["score"].apply(get_best_match)
})
matches_df[["name", "year", "event", "content", "best_match"]]
                

name year event content best_match
0 CrossFit Total 2007 GAMES [Back squat 1-rep max, Press 1-rep max, Deadli... [back_squat, back_squat, back_squat, axle_dead...
1 Hopper-Style CrossFit WOD 2007 GAMES [For time:, 1,000-meter row, Then, 5 rounds of... [row, pull_up, pull_up, push_jerk, jerk]
2 Trail Run 2007 GAMES [Trail run (approximately 5 k)] [run]
3 Sunday's Workout 2008 GAMES [For time:, 30 Squat Clean and Jerks, (155/100... [squat_clean, squat_clean, clean, clean_and_je...
4 The Hill Run 2008 GAMES [The hill run is a steep, off-trail run approx... [run, run, run, run]
... ... ... ... ... ...
315 Individual Workout 3 2023 SEMIFINALS [For time: Semifinals Linda — 10, 9, 8, 7, 6, ... [deadlift, dumbbell_snatch, dumbbell_clean, be...
316 Individual Workout 4 2023 SEMIFINALS [For load:, 800-m Assault AirRunner, Max snatc... [snatch, rest]
317 Individual Workout 5 2023 SEMIFINALS [For time:, 8 snatches, 800-m Assault AirRunne... [snatch]
318 Individual Workout 6 2023 SEMIFINALS [For time:, 20 overhead squats, 500-m row, 3 h... [overhead_walk, overhead_squat, overhead_squat...
319 Individual Workout 7 2023 SEMIFINALS [3 rounds for time of:, 15/10 Echo Bike calori... [assault_air_bike, assault_air_bike, assault_a...

320 rows × 5 columns

At first glance from the above results, it appears as if everything is fine. However, upon inspection, we can observe 3 things:

  • Duplicates
  • Duplicates with shorter tokens (example: "handstand" and "strict handstand")
  • Incorrect matches (or partial matches portrayed as correct movements). For example "handstand push up" and "push ups" are not the same but because the matches are correct for "push ups", it's considered a match.

Simple Solution So to fix that problem we can use a simple solution:

For problem nb. 1: Transform the lists to sets (Token has the __hash__ method implemented). For problems nb. 2 and 3: Consider only the longer worded Token when a Token's words are contained within another Token. Caveat: we may actually drop valid exercises. So in order to do this we could:

  • sort the list of Token by ascending size
  • generate a "dynamic" regex with the head of the array and compare it to the subsequent tail
  • use a networkx to find all the subgraphs and only keep the maximum token size

Let's add this functionality

Deduplication and Clustering

                  
def compare_tokens(left: Token, right: Token, min_overlap: int = 4) -> int:
    """Compare two tokens, a left and a right and find the identical sequential characters.
    The left token **must** always have a smaller character length than the right token.
    :param left: The left token (smaller than right)
    :param right: The right token.
    :param min_overlap: The minimum number of characters that must be overlapping to consider the tokens identical.
    :return: The score represented as the number of sequentially overlapping characters.
    """
    if left.no_spaces_char_count > right.no_spaces_char_count:
        assert AttributeError("Left length must be smaller than Right length.")

    left_string = "".join(left.words)
    right_string = "".join(right.words)

    dynamic_regex = []
    for i in range(min_overlap, len(left_string) + 1):
        dynamic_regex.append(
            left_string[0:i]
        )

    regex = f"(?=.*)({'|'.join(reversed(dynamic_regex))})+(?=.*)"
    search = re.findall(regex, right_string)
    if len(search) > 0:
        return len(max(search))
    return 0
                
                  
def cluster_tokens(tokens: list[Token], threshold: int = 4) -> list[Token]:
    """Cluster all the tokens that have a minimum sequentially overlapping number of characters.
    We then pick the Token that has the biggest character count.
    :param tokens: The list of Tokens.
    :param threshold: The minimum overlapping threshold.
    :return: A list of Tokens, where each token represents the clustered Tokens.
        Example: 'pull up' and 'chest to bar pull ups' are grouped into 'chest to bar pull ups'
        because they have a minimum of 4 overlapping sequential characters.
    """
    # sort by token character count, excluding white spaces
    sorted_tokens = sorted(tokens, key=lambda x: x.no_spaces_char_count)
    graph = nx.Graph()
    for i in range(0, len(sorted_tokens)):
        for j in range(i + 1, len(sorted_tokens)):
            sim = compare_tokens(sorted_tokens[i], sorted_tokens[j], threshold)
            if sim >= threshold:
                graph.add_edge(sorted_tokens[j], sorted_tokens[i])
        if not graph.has_node(sorted_tokens[i]):
            graph.add_edge(sorted_tokens[i], sorted_tokens[i])

    return [max(set(graph.subgraph(c)), key=lambda x: x.no_spaces_char_count) for c in connected_components(graph)]
                
                  
df = matches_df[["name", "year", "event", "best_match"]].assign(**{
    "clusters": matches_df["best_match"].apply(cluster_tokens),
    "basket": lambda row: row.apply(
        lambda x: f'{x["name"]}_{x["year"]}', axis=1
    )
}).assign(**{
    "target": lambda row: row["clusters"].apply(
        lambda x: [str(token) for token in x]
    )
})
df[["basket", "clusters", "target"]].to_json("./baskets.json", orient="records", indent=4)
df.head(3)
                

name year event best_match clusters basket target
0 CrossFit Total 2007 GAMES [back_squat, back_squat, back_squat, axle_dead... [axle_deadlift, back_squat] CrossFit Total_2007 [axle_deadlift, back_squat]
1 Hopper-Style CrossFit WOD 2007 GAMES [row, pull_up, pull_up, push_jerk, jerk] [row, push_jerk, pull_up] Hopper-Style CrossFit WOD_2007 [row, push_jerk, pull_up]
2 Trail Run 2007 GAMES [run] [run] Trail Run_2007 [run]

Now we can use the dataset to find any patterns inside the data (see mlxtend )

First we create a utility method so that we can re-use it with different parameters:

                  
def mba(
        source_df: pd.DataFrame,
        target_column: str,
        agg_cols: list[str],
        min_support: float = 0.5,
        metric: str = "confidence",
        min_threshold: float = 0.3,
        custom_filter: Callable[[pd.DataFrame], pd.DataFrame] | None = None
) -> pd.DataFrame:
    """Create an Association Rules dataframe with a source_df.
    :param source_df: The input
    :param target_column: The items column.
    :param agg_cols: The aggregation column (ie. the basket).
    :param min_support: The minimum support threshold: mlxtend is set to 0.5 by default.
    :param metric: 'support', 'confidence', 'lift', 'leverage', 'conviction' and 'zhangs_metric'. This parameter is tied min_threshold.
    :param min_threshold: The minimum threshold value to keep for the metric
    :param custom_filter: Extra filter to add to the Association Rules results.
    :return: The Association Rules dataframe.
    """

    agg_df = source_df.groupby(agg_cols).agg({
        target_column: "sum"
    }).assign(**{
        target_column: lambda row: row[target_column].apply(
            lambda x: list(set(x))
        )
    })

    encoder = TransactionEncoder()
    dataset = agg_df[target_column].to_numpy()
    te_ary = encoder.fit(dataset).transform(dataset)

    freq_itemsets = pd.DataFrame(te_ary, columns=encoder.columns_)
    fp = fpgrowth(freq_itemsets, min_support=min_support, use_colnames=True)
    if len(fp) == 0:
        raise ValueError("fpgrowth results empty. Try changing the min_support param ?")

    ar = association_rules(fp, metric=metric, min_threshold=min_threshold)
    if custom_filter:
        return ar.loc[custom_filter]

    return ar
                

MBA #1

Now we can check if there are any patterns with the exercises:

                  
mba(df, "target", ["basket"], min_support=0.1)
                

antecedents consequents antecedent support consequent support support confidence lift leverage conviction zhangs_metric
0 (ring_muscle_up) (muscle_up_strict) 0.138710 0.135484 0.129032 0.930233 6.866002 0.110239 12.391398 0.991948
1 (muscle_up_strict) (ring_muscle_up) 0.135484 0.138710 0.129032 0.952381 6.866002 0.110239 18.087097 0.988246

Interpretation

Given a minimum support of 0.1 and by using the confidence metric, we can see that the only 2 exercises that pop up are muscle_up_strict and ring_muscle_up. For example, when the starting point is a "muscle_up_strict", we have a confidence of 95% of "ring_muscle_up" beign the next exercise. However when we also consider the consequent support of 0.13, it is not very conclusive.

Next Steps

Let's try at least to push it a little bit more so we can have more insights.

Decrease Granularity

So far, we tried to find some trends at the Workout level, maybe we can do that, but per year and event type instead.

MBA #2

                  
mba(df, "target", ["year", "event"], custom_filter=lambda x: (x["support"] * x["confidence"]) > 0.65)
                

antecedents consequents antecedent support consequent support support confidence lift leverage conviction zhangs_metric
228 (ring_muscle_up) (muscle_up_strict) 0.860465 0.837209 0.813953 0.945946 1.129880 0.093564 3.011628 0.823810
229 (muscle_up_strict) (ring_muscle_up) 0.837209 0.860465 0.813953 0.972222 1.129880 0.093564 5.023256 0.706122
230 (ring_muscle_up) (toes_to_bar) 0.860465 0.813953 0.767442 0.891892 1.095753 0.067063 1.720930 0.626263
231 (toes_to_bar) (ring_muscle_up) 0.813953 0.860465 0.767442 0.942857 1.095753 0.067063 2.441860 0.469697
232 (muscle_up_strict) (toes_to_bar) 0.837209 0.813953 0.744186 0.888889 1.092063 0.062737 1.674419 0.517857
233 (toes_to_bar) (muscle_up_strict) 0.813953 0.837209 0.744186 0.914286 1.092063 0.062737 1.899225 0.453125
235 (ring_muscle_up, toes_to_bar) (muscle_up_strict) 0.767442 0.837209 0.720930 0.939394 1.122054 0.078421 2.686047 0.467742
236 (muscle_up_strict, toes_to_bar) (ring_muscle_up) 0.744186 0.860465 0.720930 0.968750 1.125845 0.080584 4.465116 0.436950
241 (double_unders_jump_rope_jump) (ring_muscle_up) 0.813953 0.860465 0.744186 0.914286 1.062548 0.043807 1.627907 0.316406
247 (ring_muscle_up, double_unders_jump_rope_jump) (toes_to_bar) 0.744186 0.813953 0.697674 0.937500 1.151786 0.091942 2.976744 0.515152
248 (toes_to_bar, double_unders_jump_rope_jump) (ring_muscle_up) 0.720930 0.860465 0.697674 0.967742 1.124673 0.077339 4.325581 0.397222
253 (ring_muscle_up, double_unders_jump_rope_jump) (muscle_up_strict) 0.744186 0.837209 0.720930 0.968750 1.157118 0.097891 5.209302 0.530792
254 (muscle_up_strict, double_unders_jump_rope_jump) (ring_muscle_up) 0.720930 0.860465 0.720930 1.000000 1.162162 0.100595 inf 0.500000
266 (ring_muscle_up, toes_to_bar, double_unders_ju... (muscle_up_strict) 0.697674 0.837209 0.674419 0.966667 1.154630 0.090319 4.883721 0.442971
267 (muscle_up_strict, toes_to_bar, double_unders_... (ring_muscle_up) 0.674419 0.860465 0.674419 1.000000 1.162162 0.094105 inf 0.428571

Okay, now we kinda got something but it's not very conclusive. When considering both support and confidence (ie. support * confidence > 0.65), we can see that when toes to bar come up (antecedents), they also likely come up with ring muscle up (consequents).

Likewise, when there are double unders and ring muscle up (antecedents), it seem toes to bar also come up (consequents). Looks like nuking the forearms is somewhat consistent.

Let's just check one last thing: what are the most common exercises by count ?

Plotting Utils

                  
def count_movements(
        df: pd.DataFrame,
        aggregation: list[str],
        filter_: Callable = lambda data: (data["target"] != "rest") & (data["counts"] > 1)
) -> pd.DataFrame:
    """Utility method that creates the movement counts per aggregation, for plotting.
    :param df: The dataframe to create the plotting data with.
    :param aggregation: The aggregation columns.
    :param filter_: A custom filter to apply to the data.
    :return: The dataframe ready for plotting.
    """
    exploded = df.explode("target")[["name", "year", "event", "target"]]

    total = exploded[["target"]].value_counts().reset_index(name="total")
    counts = (
        exploded
        .groupby(aggregation)
        .agg({
            "target": "count",
        })
        .rename(columns={"target": "counts"})
        .reset_index()
    )

    return pd.merge(
        counts, total, on=["target"]
    ).assign(**{
        "percent": lambda x: round((x["counts"] / x["total"]) * 100).astype(int)
    }).sort_values(by="total", ascending=False).loc[filter_]
                
                  
# distinctive colors
color_list = [
    "#000000", "#00FF00", "#0000FF", "#FF0000", "#01FFFE", "#FFA6FE", "#FFDB66", "#006401", "#010067", "#95003A",
    "#007DB5", "#FF00F6", "#FFEEE8", "#774D00", "#90FB92", "#0076FF",
]
                
                  
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

mby = count_movements(df, ["target", "year"])
mby["cumsum"] = mby.sort_values(by=["year"]).groupby(["target"])["counts"].transform(pd.Series.cumsum)

plt.subplots(figsize=(6, 15))
plt.style.use("dark_background")

sns.despine(left=True, bottom=True)
legends = []
for year in reversed(sorted(set(mby["year"].to_numpy()))):
    color = color_list[year - 2008]
    ax = sns.barplot(x="cumsum", y="target", data=mby[mby["year"] == year], estimator=sum, errorbar=None, color=color)
    ax.set(xlabel="Counts", ylabel="Movements")
    legends.append(mpatches.Patch(color=color, label=f"{year}"))

plt.legend(handles=legends)
plt.show()
                

png