MLops: Kubeflow with TensorFlow TFX pipelines seamlessly and at scale

Machine Learning workflow needs to run on-prem, to make it more productive we can leverage the power managed cloud services, that helps to distribute & scale out the workflow steps & to run multiple experiments or trails in parallel & the we can build orchestrated pipeline for ML Deployments.

87% of projects do not get past the experiment phase and so never make it into production

Real-world ML systems is composed of the ML code. Credit:Paper

Pipelines helps us to load our data in a required format & location, perform data cleaning & feature engineering, analyze our trained models, versioning our models, scalably serve our trained models which will avoid training or serving skew, and more.

Kubeflow started as an open sourcing of the way Google ran TensorFlow internally, based on a pipeline called TensorFlow Extended. It began as just a simpler way to run TensorFlow jobs on Kubernetes, but has since expanded to be a multi-architecture, multi-cloud framework for running entire machine learning pipelines.To integrate an ML system in a production environment, you need to orchestrate the steps in your ML pipeline. In addition, you need to automate the execution of the pipeline for the continuous training of your models.

Here’s the TOC for this article.

Table of Contents
1. Introducing Kubeflow Pipelines
2. TFX Architecture
3. TFX Building Blocks
4. Orchestrating the ML system using Kubeflow Pipelines
5. Conclusion

**Introducing Kubeflow Pipelines**

Kubeflow is an open source Kubernetes-native platform for developing, orchestrating, deploying, and running scalable and portable ML workloads. It helps support reproducibility and collaboration in ML workflow lifecycles, allowing you to manage end-to-end orchestration of ML pipelines, to run your workflow in multiple or hybrid environments & to help you reuse building blocks across different workflows. Kubeflow also provides support for visualisation and collaboration in your ML workflow.

Kubeflow Architecture running on Kubernetes
  • Kubernetes resources: The Pipeline Service calls the Kubernetes API server to create the necessary Kubernetes resources (CRDs) to run the pipeline.
  • Orchestration controllers: A set of orchestration controllers execute the containers needed to complete the pipeline. The containers execute within Kubernetes Pods on virtual machines. An example controller is the Argo Workflow controller, which orchestrates task-driven workflows.
  • Pipeline web server: The Pipeline web server gathers data from various services to display relevant views: the list of pipelines currently running, the history of pipeline execution, the list of data artifacts, debugging information about individual pipeline runs, execution status about individual pipeline runs.
  • Pipeline Service: You call the Pipeline Service to create a pipeline run from the static configuration.
  • Persistence agent and ML metadata: The Pipeline Persistence Agent watches the Kubernetes resources created by the Pipeline Service and persists the state of these resources in the ML Metadata Service.
  • DSL compiler: The DSL compiler transforms your pipeline’s Python code into a static configuration (YAML).

In this article, we’ll describe how we can tackle ML workflow operations with Kubeflow Pipelines with TFX Integration.

**TFX Architecture**

TFX Architecture & Components

The following steps can be completed manually or by an automated pipeline:

  1. Data Ingestion: The first step is to extract the new training data from its data sources. The outputs of this step are data files that are used for training and evaluating the model.
  2. Data validation: TFDV validates the data against the expected (raw) data schema. The data schema is created and fixed during the development phase, before system deployment. The data validation steps detect anomalies related to both data distribution and schema skews. The outputs of this step are the anomalies (if any) and a decision on whether to execute downstream steps or not.
  3. Data transformation: After the data is validated, the data is split and prepared for the ML task by performing data transformations and feature engineering operations using TFT. The outputs of this step are data files to train and evaluate the model, usually transformed in TFRecords format. In addition, the transformation artifacts that are produced help with constructing the model inputs and with exporting the saved model after training.
  4. Model training and tuning: To implement and train the ML model, use the tf.estimator or tf.Keras APIs with the transformed data produced by the previous step. To select the parameter settings that lead to the best model, you can use Keras tuner, a hyperparameter tuning library for Keras, or you can use other services like Katib. The output of this step is a saved model that is used for evaluation, and another saved model that is used for online serving of the model for prediction.
  5. Model evaluation and validation: When the model is exported after the training step, it’s evaluated on a test dataset to assess the model quality by using TFMA. TFMA evaluates the model quality as a whole, and identifies which part of the data model isn’t performing. This evaluation helps guarantee that the model is promoted for serving only if it satisfies the quality criteria. The criteria can include fair performance on various data subsets (for example, demographics and locations), and improved performance compared to previous models or a benchmark model. The output of this step is a set of performance metrics and a decision on whether to promote the model to production
  6. Model serving for prediction: After the newly trained model is validated, it’s deployed as a microservice to serve online predictions using TensorFlow Serving. The output of this step is a deployed prediction service of the trained ML model. You can replace this step by storing the trained model in a model registry. Subsequently a separate model serving CI/CD process is launched.

Why TFX?

Building this type of automation is non-trivial, and it becomes even more challenging when we consider the following complications:

  • Building one machine learning platform for many different learning tasks: Products can have substantially different needs in terms of data representation, storage infrastructure, and machine learning tasks. The machine learning platform must be generic enough to handle the most common set of learning tasks as well as be extensible to support one-off atypical use-cases.
  • Continuous training and serving: The platform has to support the case of training a single model over fixed data, but also the case of generating and serving up-to-date models through continuous training over evolving data(e.g., a moving window over the latest n days of a log stream).
  • Human-in-the-loop: The machine learning platform needs to expose simple user interfaces to make it easy for engineers to deploy and monitor the platform with minimal configuration. Furthermore, it also needs to help users with various levels of machine-learning expertise under- stand and analyze their data and models.
  • Production-level reliability and scalability: The platform needs to be resilient to disruptions from inconsistent data, software, user configurations, and failures in the underlying execution environment. In addition, the platform must scale gracefully to the high data volume that is common in training, and also to increases in the production traffic to the serving system.

Having this type of platform enables teams to easily deploy machine learning in production for a wide range of products, ensures best practices for different components of the platform.Deploying TFX led to reduced custom code, faster experiment cycles, and a 2% increase in app installs resulting from improved data and model analysis.

** TFX building blocks**

TensorFlow Extended (TFX) is a TensorFlow-based platform for performant machine learning in production, first designed for use within Google, but now mostly open sourced.

Kubeflow and our example ML workflows use following TFX components as building blocks: Tensorflow Data Validation, TensorFlow Transform, TensorFlow Modeling, TensorFlow Model Analysis, and TensorFlow Serving.

1) TensorFlow Data Validation

TensorFlow Data Validation (TFDV) is a library for exploring and validating machine learning data. It is designed to be highly scalable and to work well with TensorFlow and TensorFlow Extended (TFX). Its features include automated data schema generation to describe expectations about data like required values, ranges, and vocabularies.TFDV allows you to perform the kind of analyses we discussed previously, e.g. generating schemas and validating new data against an existing schema. It also offers visualizations based on the Google PAIR project Facets.

TFX provide a pipeline component called StatisticsGen which accepts the output of the previous ExampleGencomponents as input and then performs the generation of the statistics:

def statisticsgen(example_gen):
statistics_gen = StatisticsGen(examples=example_gen.outputs[‘examples’])
return statistics_gen
TFX StatisticsGen Component Visualisation

TensorFlow Data Validation provides an inbuilt “skew comparator” that detects large differences between the statistics of two datasets. This isn’t the statistical definition of skew (a dataset that is asymmetrically distributed around its mean). It is defined in TFDV as the L-infinity norm of the difference between the serving_statistics of two datasets.

2) TensorFlow Transform

TensorFlow Transform (TFT) is a library designed to preprocess data for TensorFlow — particularly for feature engineering. tf.Transform is useful for transformations that require a full pass of the dataset, such as normalizing an input value by mean and standard deviation, converting a vocabulary to integers by looking at all input examples for values, or categorizing inputs into buckets based on the observed data distribution.

TFX Transform Architecture

However, by using TFT for preprocessing, the output of tf.Transform is exported as a TensorFlow graph to use for both training and serving, and this TFT graph is exported as part of the inference graph. This process prevents training-serving skew, since the same transformations are applied in both stages. The pipelines use TFT to support preprocessing, and this means that after the trained models are deployed for serving and we send a prediction request, the prediction input data is being processed in exactly the same way as was done for training, without the need for any client-side preprocessing framework.

Some of the Transformation Function:

tft.scale_to_z_score():If you want to normalize a feature with a mean of 0 and standard deviation of 1, you can use this useful TFT function.

tft.bucketize():This useful function lets us bucketize a feature into bins. It returns the bin or bucket index. You can specify the argument num_buckets to set the number of buckets. TFT will then divide the equal sized buckets.

tft.pca():This function lets you compute the principal component analysis (PCA) for a given feature. PCA is a common technique to reduce dimensionality by linearly projecting the data down to the subspace which best preserves the variance of the data. It requires the argument output_dim to set the dimensionality of your PCA representation.

tft.compute_and_apply_vocabulary():This is one of the most amazing TensorFlow Transform functions. It computes all unique values of a feature column and then maps the most frequent values to an index. This index mapping is then used to convert the feature to a numerical representation. The function generates all assets for your graph behind the scenes. We can configure most frequent in two ways: Either by defining the n highest ranked unique items with top_kor by the frequency_threshold above each element will be considered in the vocabulary.

tft.apply_saved_model():This function lets us apply entire TensorFlow models on a feature. We can load a saved model with a given tag and signature_name and the inputs will be passed to the model. The predictions from the model execution will be then returned.

TFX Transform Component Graph Tensors
def preprocessing_fn(inputs):
outputs = {}
outputs[_transformed_name(key)] = tft.scale_to_z_score(
outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
outputs[_transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])
taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
tips = _fill_in_missing(inputs[_LABEL_KEY])
outputs[_transformed_name(_LABEL_KEY)] = tf.where(
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
def _fill_in_missing(x):
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(tf.sparse.to_dense(tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),default_value),axis=1)

Now we can run the below function

def transformation(example_gen,schema_gen):
transform = Transform(examples=example_gen.outputs['examples'],schema=schema_gen.outputs['schema'],module_file=os.path.abspath(_taxi_transform_module_file))
return transform
TFX Transform Component Artifacts Results

When we execute the Transform component, TensorFlow Extended will apply the transformations, defined in our module file, and apply those to the loaded input data, loaded to TFRecords during the data ingestion step. The component will then output our transformed data, a transform graph and the required metadata.

3) TensorFlow Modeling

Trainer component will produce a model that will be put into production, where it will transform new data and use the model to make predictions. Because the Transform steps are included in this model, the data preprocessing steps will always match what the model is expecting. This removes a huge potential source of errors when our model is deployed.

TFX Trainer Architecture
def _build_estimator(tf_transform_dir,

metadata_dir = os.path.join(tf_transform_dir,
transformed_metadata = metadata_io.read_metadata(metadata_dir)
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()


real_valued_columns = [
tf.feature_column.numeric_column(key, shape=())
for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
categorical_columns = [
key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
for key in _transformed_names(_VOCAB_FEATURE_KEYS)
categorical_columns += [
key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
for key in _transformed_names(_BUCKET_FEATURE_KEYS)

return tf.estimator.DNNLinearCombinedClassifier(
dnn_hidden_units=hidden_units or [100, 70, 50, 25],

TFX supported only tf.Estimator models and the Trainer component was solely designed for Estimators. The default implementation of the Trainer component used the trainer_fn() function as an entry point to the training process, but this entry point is very tf.Estimator specific. The Trainer component expects the definition of the Estimator inputs functions like train_input_fn(), eval_input_fn() and serving_receiver_fn()

def train_and_maybe_evaluate(hparams):
schema = taxi.read_schema(hparams.schema_file)
train_input = lambda: model.input_fn(...)
eval_input = lambda: model.input_fn(...)
=lambda: model.example_serving_receiver_fn(...)
train_spec = tf.estimator.TrainSpec(...)
eval_spec =tf.estimator.EvalSpec(...)
exporter = tf.estimator.FinalExporter('chicago-taxi'
,serving_receiver_fn) run_config = tf.estimator.RunConfig(
save_checkpoints_steps=999, keep_checkpoint_max=1)
estimator = model.build_estimator(...)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
return estimator

TFX also supports Keras or Estimator few functionalities below:

TFX Multi-GPU Thread

MirroredStrategy:This strategy is relevant for multiple GPUs on a single instance, and it follows the synchrononous training pattern. The MirroredStrategy is a good default strategy if you train a machine learning model on a single node with multiple GPUs and your machine learning model fits in the GPU memory.

MultiWorkerMirroredStrategy:This follows the design patterns of the MirroredStrategy, but it copies the variables across multiple workers (e.g. compute instances). The MultiWorkerMirroredStrategy is an option if one node isn’t enough for your model training.

TPUStrategy:This strategy lets you use Google Cloud’s TPUs. It follows the synchronous training patterns and basically works like MirroredStrategy except for TPUs instead of GPUs.

ParameterServerStrategy:The ParameterServerStrategy uses multiple nodes as the central variable repository.

OneDeviceStrategy:The whole point of the OneDeviceStrategy is to test the whole model setup before engaging into real distributed training.

4) TensorFlow Model Analysis

TFMA is a library for evaluating TensorFlow models. It allows users to evaluate their models on large amounts of data in a distributed manner, using the same metrics defined for training. These metrics can be computed over different slices of data and visualized in Jupyter notebooks.

TFMA makes it easy to visualize the performance of a model across a range of circumstances, features, and subsets of its user population, helping to give developers the analytic insights they need to be confident their models will treat all users fairly.