This is the third post in a three part series on creating a reusable ML pipeline that is initiated with a single config file and five user-defined functions. The pipeline is finetuning-based for the purposes of classification, runs on distributed GPUs on AWS Sagemaker and uses Huggingface Transformers, Accelerate, Datasets & Evaluate, PyTorch, wandb and more.
This post will cover the training and testing (inference) steps. These are the core steps in a ML pipeline where a model is hyper-parameter tuned and the test set is used to measure performance. If you have landed on this post first, check out the first post in the series detailing the pipeline setup and the second post detailing the data steps.
Training and Tuning
The reason I have combined Training and Tuning into one section is that Tuning just is a set of training jobs where performance is incrementally improved through the changing of hyperparameters. As such, underneath the covers, the two types of jobs are calling the same code. Like we have previously, let’s take a look first at perform_training()
and perform_tuning()
to see how the code interacts with Sagemaker.
Zooming into perform_training()
, we encounter the first bit of backend code that handles a use case we have not yet discussed: comparing two models. If you recall in part one, one of the motivations for creating this pipeline was to rapidly test multiple Document Understanding models and compare performance between them. As such, the pipeline is built to handle, in a single experiment, multiple models being passed in the settings.ini
file the experimenter defines. In fact, the MODEL_NAMES
parameter from this file can accept one or many model names, the latter implying that the experimenter wants to run a comparison job. A comparison job has no impact on Data Reconciliation or Data Preparation; we want these steps to be isomorphic to a single model job as the idea is that n models get trained and tested on the exact same snapshot of training data. With that preamble, perform_training()
looks like this:
The loop here is iterating over either a list with n model names or a list with a single model name. For each model name, an Estimator() is constructed and .fit()
is called which kicks off a training job on Sagemaker. get_estimator_kwargs()
will look familiar to anyone who has trained on Sagemaker already:
Settings are extracted from the config we discussed in the first post in the series, the most important of which is config.docker_image_path
. As a refresher, this is the ECR URL of the training image the experimenter created in the setup that is used between Sagemaker Processor/Training/Tuning jobs and contains all needed dependencies. Next, perform_training
checks a boolean from the settings.ini file, USE_DISTRIBUTED
which defines whether or not the experimenter expects distributed GPU training to occur. If so, it sets some extra Estimator parameters which are largely inspired by the _distribution_configuration function from the sagemaker-sdk.
I will digress for a moment here to talk about one such parameter, namely, an environment variable called USE_SMDEBUG
. SMDEBUG refers to a debugging tool called Sagemaker Debugger. For reasons I cannot explain and have not been answered by AWSlabs, this tool is on by default and distributed training would not work for some models, producing mysterious exception traces. It only became obvious to me when carefully examining the traces and seeing that it was some code in smdebug that was ultimately throwing. Furthermore, there are a variety of ways to turn off smdebug, for instance passing 'debugger_hook_config': False
as done above or environment={‘USE_SMDEBUG’:0}
. However, these methods only work on Training jobs. Again, for reasons I cannot explain, the only way to turn off SMDEBUG on Tuning jobs is to set the env var inside the docker container being used: ENV USE_SMDEBUG="0"
; the other methods explained above somehow never make it to a Tuning jobs constituent Training jobs. An unfortunate side effect of this is that it makes it difficult for an experimenter to configure this environment variable. At any rate, hopefully AWSlabs fixes and or makes smdebug exceptions more user friendly.
The call to .fit()
makes the actual call to the AWS API. The config.training_data_uri
parameter specifies the S3 URI of the encoded training data from the Data Preparation step; the training instance will download this data to local disk before it executes where it can be easily accessed by multiple GPU processes. How does the job know what code to execute? That is specified in the base docker container which is extended by the experimenter:
These environment variables are used by the sagemaker-training
library to kick off the training script. At this point we would dive into train.py,but since it is also used by a Tuning job, let’s take a look at how we kick off a Tuning job. The beginning of a Tuning job is nearly identical to a Training job:
But now, instead of calling .fit(), we need to set up a few more parameters a Tuning job requires. A Tuning job requires a set of constant hyperparameters and tunable hyperparameters. As such, here an example of what an experimenter might write in the settings.ini
file to represent this:
Here the constants will not change between tuning jobs, but the tunable parameters will start with guesses and those guesses will get better as jobs complete. The ->
and ,
are syntax I’ve chosen; in this context ->
stands for an interval while ,
stands for categorial options. Having seen this, the next piece of the Tuning job setup should make sense:
Now we have our dict of tunable parameters we can pass to the HyperparameterTuner object:
This should look somewhat familiar to what we just did for Training with a few extra parameters. So far, the HyperparameterTuner
object takes the constructed Estimator()
object that will be re-used for each constituent Training job and the tunable hyperparameters we just discussed. A Tuning job needs to measure a metric in order to decide if one set of hyperparameters are better than another. objective_metric_name
is the name of that metric. This value is also used in the metric_definitions
parameter which explicitly defines how the HyperparameterTuner job can extract the objective metric value from the logs for comparison. To make this more concrete, this is how these values are defined in an example settings.ini file:
Finally, the max_jobs
parameter defines how many total Training jobs will constitute the Tuning job and max_parallel_jobs
defines how many can run in parallel at a given time. Like the Estimator
in the Training job, we call fit()
to actually kick off the Tuning job and pass it the training_data_uri
like we did previously. With this in place, we can now look at train.py
and see what executes when a Training or Tuning job is executed.
The goal of train.py is to fine tune a loaded model using a set of distributed GPUs, compute a number of metrics, determine which is the best model, extract that model’s state_dict, convert that model to torchscript, and save these files along with a number of graphs to S3. Huggingface’s Accelerate, Evaluate and Transformers libraries are all used to greatly simplify this process. Before continuing, I have to give a brief shoutout to the Accelerate devs who were extremely responsive while I was building this pipeline.
Note that in a distributed setting, every GPU process is going to execute this same train.py file. While much of this coordination can be passed off to Accelerate, it is helpful to understand that while working inside it. Diving a level deeper, train.py is going to:
- Read hyperparameters and determine if the running job is a tuning job, training job or comparison job
- Determine if gradient accumulation will be utilized
- Construct the `Accelerator()` object which handles distribution
- Initialize wandb trackers
- Load split training data and create `Dataloader()`s for training and validation
- Set up an optimizer with learning rate scheduling
- Execute a training and validation loop, computing metrics and storing metric histories and determining what the best model was
- Plot curves for metrics
- Extract the curves, statistics and best model from the loops
- Write all of this data to S3
We start by reading the passed hyperparameters and setting a few values that can be used throughout the training process:
_tuning_objective_metric
is a hyperparamter set by Sagemaker that allows us to easily differentiate between Training and Tuning jobs. As we’ve mentioned before, the run_num is an important setting that allows us to organize our results and version our models in production so they easily connect back to training runs. Finally, job_type_str
allows us to further organize our runs as training / tuning and comparison jobs.
Next we determine if gradient accumulation is needed. Briefly, gradient accumulation allows us to set batch sizes that are larger than what the GPUs we’re running on can store in memory:
Control now moves to setting up the Accelerator() object which is the tool for managing distributed processing:
Here we encounter a core concept in Accelerate, is_main_process
. This boolean provides a simple way to execute code on one of the distributed processes. This is helpful if we want to run code as if we’re on a single process; for instance if we want to store a history of metrics as the training loop executes. We use this boolean to set up wandb so we can easily log metrics to wandb. Additionally, accelerator.print()
is similar to if accelerator.is_main_process print(...)
, it ensures whatever statement is only printed once.
Recall that we passed config.training_data_uri
to the .fit()
call for both Training and Tuning jobs. This downloads all of the training data to the Sagemaker instance’s local disk. Thus, we can use Datasets load_from_disk()
function to load this data. Note in the following code SAGEMAKER_LOCAL_TRAINING_DIR
is just the path to the dir that data is downloaded to.
Each process loads the dataset, id2label file, metrics and creates dataloaders. Note the use of Huggingface’s evaluate library to load metrics; these can be used in tandem with Accelerate to make metric tracking simple during distributed training. We will see shortly how Accelerator provides one simple function to handle distributed training.
In this code block, we first call the user-defined function load_model
to receive the loaded model defined however the experimenter would like. Thus far, this function has typically looked like a call to a Transformers from_pretrained()
function, though this is not enforced.
A common learning rate optimizer is created and used to create a learning rate scheduler. Finally, we encounter another core concept in Accelerator, namely, wait_for_everyone()
. This function guarantees that all processes have made it to this point before proceeding to the next line of code. It must be called before the prepare()
function which prepares all of the values we’ve created thus far for training (in our case, distributed training). wait_for_everyone()
is used regularly in Accelerator code; for example, it is nice to have when ensuring that all GPUs have completed the training loop. After the prepare()
step, the code enters a function to perform the training and validation loop. Next, we will look at how Accelerator works inside that loop.
At the start of the loop, we initialize a number of values to track throughout training. Here we use is_main_process
again to create a single version of metric histories which we will use to plot graphs. In this example, we are only tracking training loss, validation accuracy and f1, but any number of metrics could be tracked here. Next, we enter the loop, set the model in train()
mode and enter the train()
function:
As execution enters a batch, it first needs to check if we’re running a comparison job. If so, it needs to extract the appropriate parameters for the current model’s forward()
function. If you recall, for comparison jobs, in the Data Preparation step we combined all inputs in the same pyarrow format, but prepended with the model_name
(e.g. longformer_input_ids). get_model_specific_batch()
just returns those parameters of the batch that match the current model_name
.
Next, we encounter with accelerator.accumulate(model)
, a context manager that recently came out in Accelerate that manages gradient accumulation. This simple wrapper reduces gradient accumulation to a single line. Underneath that manager, back propagation should look familiar to readers who have written ML code before, the one big difference is calling accelerator.backward(loss)
instead of loss.backward()
.
Upon completing a training batch, execution sets the model in .eval()
mode and moves into the validation loop:
Here we encounter another key accelerate function, gather_for_metrics()
. This recently added function makes it much easier to gather predictions in a distributed setting so they can be used to calculate metrics. We pass the returned values to the f1_metric
and acc_metric
objects we created earlier using the Evaluate library. The validation loop then computes the scores and returns them.
After sending the batch through training and validation, we perform tracking on the values we initialized at the beginning:
Since is_main_process
contains the references to our history-tracking datastructures, we use it to append our new values. accelerator.log
links up with the init_trackers
call we made earlier: .log
sends these values to the tracker earlier initialized. In our case wandb will create graphs out of these values. Finally we use the F1 score to determine the best model over time.
After the training and validation loop is done, we execute:
We start by ensuring that all processes have completed the training/validation loop and then call unwrap_model
to extract the model from its distributed containers. Since the main process contains our metric histories, we use it to plot curves for each metric and calculate model statistics; we then return out the best model, curves and statistics.
Now that the training/validation loops are complete and we’ve determined a best model, we need to convert that best model to torchscript and save all the returned files to S3.
Here we call end_training
since we are using wandb and use is_main_process
since we no longer need distribution. accelerator.save()
is the correct way to save the model to disk, but we need to convert it to torchscript to mirror production as closely as possible. Briefly, Torchscript is a way of converting a python-based model into a serializable, production-friendly format that need not have a python dependency. As such, when testing inference on an unseen test set, it is best to test on the model that would be in production. One way to convert a model is to call torch.jit.trace
passing it the model and a sample instance which is how we’ve implemented the conversion:
First, we take the best model and put it in CPU and evaluation mode. We then grab a sample instance out of the training data. Next, we encounter another user-defined function ordered_input_keys()
. If you recall, this function returns the parameter names for a model’s forward()
function in the correct order. It probably didn’t make sense earlier why this function was needed, but now it should: the example_inputs
parameter of torch.jit.trace
takes a tuple of input values which must match the exact parameter ordering of the forward()
function.
Now, if we’re running a comparison job, then ordered_input_keys()
is going to return a dictionary of OrderedDict’s with keys based on each model’s name. Thus, we test for this scenario and use the same get_model_specific_batch()
function we used during training to extract a sample instance for the current model being converted.
Next, we iterate the ordered input keys and call .unsqueeze(0)
on each parameter of the sample instance. The reason for this is because the forward()
function expects a batch size as the first dimension of the input data; .unsqueeze(0)
adds a dimension of 1 onto the tensors representing each parameter’s data.
Now we are ready to run the trace, passing the model, the example inputs and setting two parameters to false. The strict
parameter controls whether or not you want the tracer to record mutable containers. By turning this off, you can allow, for example, your outputs = model(**batch)
to remain a dict instead of a tuple. But you must be sure that the mutable containers used in your model aren’t actually mutated. check_trace
checks that the same inputs run through the traced code produce the same outputs; in our case, leaving this True
was producing odd errors, likely because of some internal non-deterministic operations, so we set it to False. Again, the ultimate test of the performance of the model is the inference step which we will be discussing next.
Finally, we save the traced model to local disk so it can be uploaded to s3. The final step of the train.py file is to upload all of these generated files to S3. In the case of a tuning job, we only retain the generated files from the run with the best objective metric score:
And with that, we have completed discussing the training/tuning step of the ML Pipeline. Next, we will look at the inference step where we load the torchscript model, perform inference on the unseen test set and collect statistics.
Inference
In the Training/Tuning step, we convert our best model into torchscript which means it can easily run on the CPU or multi-CPU environment. This enables us to hijack a Sagemaker Processor instance to perform our inference job. Like the previous sections, we will first look at how an inference job is initiated. Because we can use a Processor instance, it is identical to our Data Preparation step except for pointing it at our /test/
data and our inference.py
file.
Refer to the Data Preparation section of the second post to learn more about Processor/ScriptProcessor jobs. Note the differences of input_source_dir
pointing at /test/
and `code` pointing at inference.py
. Since these are so similar, we will move on to looking at the inference.py file.
We’ve discussed repeatedly the importance of run_num
and how it is used to help identify the current experiment not only while training, but also the current model in production (so a production model can be linked to a training experiment). The inference.py
will use the experiment parent directory to find the test data and the run_num
to find the correct trained model.
The inference.py
starts by downloading the id2label file so we can translate between model predictions and human-readable predictions:
Recall from previous sections that the ML pipeline is capable of running comparison jobs (n models trained and tested on the same dataset). Inference is the step where comparison really shines, allowing you to compare performance on identical data. In the next code block, we will load n models to prepare for inference. Recall that if a single model was trained, it is passed as a list with a single value:
This loop iterates the model names, downloads/loads the torchscript converted model and initializes statistics tracking for each. Let’s take a look at each inner function:
This function constructs the path the .pt file will be behind and downloads the .pt file. It then calls torch.jit.load
and sets the model to eval mode, ready for inference. init_model_stats
initializes values we will track per model, for each label which provides us facts that we can use to build statistics:
And init_metrics()
simply loads the metrics we used earlier in the training step:
Next, we get the test data from the Data Preparation step:
With the models and data loaded, we are now ready to run inference:
The inference code will use config.is_comparison
repeatedly to execute code specific to comparison jobs. It starts by initializing statistics specifically for comparisons which we will skip for now. Next, it enters the main loop which iterates through each instance of unseen test data. The ground truth label is extracted and execution enters the inner loop over the model names (in the case of one model this is just a List with a single entry). is_comparison
is called to extract the data specific to the current model using the same function used in Training (get_model_specific_batch
). The instance is then prepared for the forward()
function using the same technique we used in covert_to_torchscript
: each value gets .unsqueeze(0)
called in order to add a batch size of 1 as the first dimension of the tensor.
We then grab the currently loaded model and pass the instance to it. We extract the most confident prediction from the returned logits by calling argmax(-1)
. Now let’s look at the remainder of the loop (note this begins inside the inner loop):
We take the prediction produced by the model and pass it and the ground truth to our accuracy and f1 metrics. We then increment the counters we initialized at the beginning:
If inference.py is running a comparison job, we then add counts to the structure we initialized earlier; we will skip over these calls and jump to `process_statistics` which occurs after the inference code has finished looping:
This function looks intimidating, but all it is doing is calculating the F1 score and Accuracy per label, sorting the results by F1 score descending, calculating the overall F1 and Accuracy and uploading the results to S3 under the correct parent dir and run_num.
If you’ve followed the ML Pipeline blogs up to this point, it is prescient to revisit the folder structure that is built on S3 while the entire pipeline executes that we laid out in the first blog:
This folder structure recurs for every machine learning experiment, containing everything one would need to quickly understand the experiment or reproduce it and link an experiment to what is in production.
Prima facie, it seems like a simple part of the overall pipeline, but I believe it is one of the most important: imbuing each experiment with desirable properties like navigability, readability, reproducibility, versioning and more.
If you’ve been following these blogs up to this point then you’ve been on quite a journey. I hope they provide some guidance in setting up your own ML Pipeline. As we continue to modify ours we will post on blog-worthy topics so stay tuned. If you can check out the first two posts in the series here: Part One: Setup, Part Two: Data Steps.