MLops: Kubeflow with Feature Store Feast in Google Cloud Platform with Batch Sources

@credits Medium Source

Feast (Feature Store) is an operational data system for managing and serving machine learning features to models in production.The Feature Store design pattern simplifies the management and reuse of features across projects by decoupling the feature creation process from the development of models using those features.

Problems in feature Engineering in Machine learning?

Good feature engineering is crucial for the success of many machine learning solutions. However, it is also one of the most time-consuming parts of model development. Some features require significant domain knowledge to calculate correctly, and changes in the business strategy can affect how a feature should be computed. To ensure such features are computed in a consistent way, it’s better for these features to be under the control of domain experts rather than ML engineers. An ML engineer or data scientist will typically experiment with multiple different transformations to determine which are helpful and which aren’t, before deciding which features will be used in the final model. Many times, the data used for the ML model isn’t drawn from a single source. Some data may come from a data warehouse, some data may sit in a storage bucket as unstructured data, and other data may be collected in real time through streaming. The structure of the data may also vary between each of these sources, requiring each input to have its own feature engineering steps before it can be fed into a model. This development is often done on a VM or personal machine, causing the feature creation to be tied to the software environment where the model is built, and the more complex the model gets, the more complicated these data pipelines become.

An ad hoc approach where features are created as needed by ML projects may work for one-off model development and training, but as organizations scale, this method of feature engineering becomes impractical and significant problems arise:

  • Ad hoc features aren’t easily reused. Features are re-created over and over again, either by individual users or within teams, or never leave the pipelines (or notebooks) in which they are created. This is particularly problematic for higher-level features that are complex to calculate. This could be because they are derived through expensive processes, such as pre-trained user or catalog item embeddings. Other times, it could be because the features are captured from upstream processes such as business priorities, availability of contracting, or market segmentations. Another source of complexity is when higher-level features, such as the number of orders by a customer in the past month, involve aggregations over time. Effort and time are wasted creating the same features from scratch for each new project.

Problems Feast Solves

The solution is to create a shared feature store, a centralised location to store and document feature datasets that will be used in building machine learning models and can be shared across projects and teams. The feature store acts as the interface between the data engineer’s pipelines for feature creation and the data scientist’s workflow building models using those features .This way, there is a central repository to house precomputed features, which speeds development time and aids in feature discovery. This also allows the basic software engineering principles of versioning, documentation, and access control to be applied to the features that are created.

A typical feature store is built with two key design characteristics: tooling to process large feature data sets quickly, and a way to store features that supports both low-latency access (for inference) and large batch access (for model training). There is also a metadata layer that simplifies documentation and versioning of different feature sets and an API that manages loading and retrieving feature data.

Bridge between raw data sources and model training and serving.

Models need consistent access to data: ML systems built on traditional data infrastructure are often coupled to databases, object stores, streams, and files. A result of this coupling, however, is that any change in data infrastructure may break dependent ML systems. Another challenge is that dual implementations of data retrieval for training and serving can lead to inconsistencies in data, which in turn can lead to training-serving skew.

Feast decouples your models from your data infrastructure by providing a single data access layer that abstracts feature storage from feature retrieval. Feast also provides a consistent means of referencing feature data for retrieval, and therefore ensures that models remain portable when moving from training to serving.

Deploying new features into production is difficult: Many ML teams consist of members with different objectives. Data scientists, for example, aim to deploy features into production as soon as possible, while engineers want to ensure that production systems remain stable. These differing objectives can create an organizational friction that slows time-to-market for new features.

Feast addresses this friction by providing both a centralized registry to which data scientists can publish features, and a battle-hardened serving layer. Together, these enable non-engineering teams to ship features into production with minimal oversight.

Models need point-in-time correct data: ML models in production require a view of data consistent with the one on which they are trained, otherwise the accuracy of these models could be compromised. Despite this need, many data science projects suffer from inconsistencies introduced by future feature values being leaked to models during training.

Feast solves the challenge of data leakage by providing point-in-time correct feature retrieval when exporting feature datasets for model training.

Features aren’t reused across projects: Different teams within an organization are often unable to reuse features across projects. The siloed nature of development and the monolithic design of end-to-end ML systems contribute to duplication of feature creation and usage across teams and projects.

Feast addresses this problem by introducing feature reuse through a centralized system (a registry). This registry enables multiple teams working on different projects not only to contribute features, but also to reuse these same features. With Feast, data scientists can start new ML projects by selecting previously engineered features from a centralized registry, and are no longer required to develop new features for each project.

Trade off performance of Feast

Design pattern can handle both the requirements of data being highly scalable for large batches during training and extremely low latency for serving online applications.

Different systems may produce data at different rates, and a feature store is flexible enough to handle those different cadences, both for ingestion and during retrieval. For example, sensor data could be produced in real time, arriving every second, or there could be a monthly file that is generated from an external system reporting a summary of the last month’s transactions. Each of these need to be processed and ingested into the feature store. By the same token, there may be different time horizons for retrieving data from the feature store. For example, a user-facing online application may operate at very low latency using up-to-the-second features, whereas when training the model, features are pulled offline as a larger batch but with higher latency.

There is no single database that can handle both scaling to potentially terabytes of data and extremely low latency on the order of milliseconds. The feature store achieves this with separate online and offline feature stores and ensures that features are handled in a consistent fashion in both scenarios.

Lastly, a feature store acts as a version-controlled repository for feature datasets, allowing the same CI/CD practices of code and model development to be applied to the feature engineering process. This means that new ML projects start with a process of feature selection from a catalog instead of having to do feature engineering from scratch, allowing organizations to achieve an economies-of-scale effect — as new features are created and added to the feature store, it becomes easier and faster to build new models that reuse those features.

Kubeflow & Feast Installation

PART A Kubeflow

~Deploy AI Platform Pipelines with full access to Google Cloud and follow the following link https://cloud.google.com/ai-platform/pipelines/docs/setting-up.

~ Once the AI platform setup it will create a cluster than navigate to Kubernetes Cluster and activate the Cloud shell from top right first button and connect to the cluster by clicking

~ Install the helm3 and run the below 3 commands or follow the link

curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh

~ Install Kubectl from the following link steps.

PART B Feast

Add the Feast Helm repository and download the latest charts.Feast includes a Helm chart that installs all necessary components to run Feast Core, Feast Online Serving.

helm repo add feast-charts https://feast-helm-charts.storage.googleapis.comhelm repo update
helm install feast-release feast-charts/feast

Feast Core requires Postgres to run, which requires a secret to be set on Kubernetes:

kubectl create secret generic feast-postgresql --from-literal=postgresql-password=password

Use Kubernetes Operator for Apache Spark

The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.

The operator consists of:

  • a SparkApplication controller that watches events of creation, updates, and deletion of SparkApplication objects and acts on the watch events,
Spark Operator Architecture

Specifically, a user uses the sparkctl (or kubectl) to create a SparkApplication object. The SparkApplicationcontroller receives the object through a watcher from the API server, creates a submission carrying the spark-submitarguments, and sends the submission to the submission runner. The submission runner submits the application to run and creates the driver pod of the application. Upon starting, the driver pod creates the executor pods. While the application is running, the Spark pod monitor watches the pods of the application and sends status updates of the pods back to the controller, which then updates the status of the application accordingly.

Go to Github link and run the set of steps from the spark.md file and step-up the Spark in Kubernetes cluster.

Next run the following. below command and edit the following and add the service accounts in subjects tag.

kubectl edit clusterrolebinding spark-operatorsparkoperator-crb

Lastly, make sure that the service account used by Feast has permissions to manage Spark Application resources. This depends on your k8s setup, but typically you’d need to configure a Role and a RoleBinding like the one below:

kubectl edit clusterrolebinding spark-operatorsparkoperator-crbkubectl apply -f role-binding.yaml -n spark-operator

Kubernetes secret for the default namespace

To connect to GCR from an environment other than GCP, you add an ImagePullSecrets field to the configuration for a Kubernetes service account. This is a type of Kubernetes secret that contains credential information.

Kubernetes Secret @credit google sources

Run the below commands

### GCP KUBEFLOWexport SA_NAME=feastai
export NAMESPACE=default
export SECRETNAME=user-gcp-sa
PROJECT_ID=$(gcloud config list core/project --format='value(core.project)')
# Create service account
gcloud iam service-accounts create $SA_NAME \
--display-name $SA_NAME --project "$PROJECT_ID"
# Grant permissions to the service account by binding roles
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \--role=roles/storage.admin
# Note that you can not bind multiple roles in one line.gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \--role=roles/ml.admin
# Create credential for the service accountgcloud iam service-accounts keys create application_default_credentials.json --iam-account $SA_NAME@$PROJECT_ID.iam.gserviceaccount.com# Attempt to create a k8s secret. If already exists, override.kubectl create secret generic user-gcp-sa \
--from-file=user-gcp-sa.json=default.gcp.json \-n $NAMESPACE --dry-run -o yaml | kubectl apply -f -
kubectl create secret docker-registry $SECRETNAME -n $NAMESPACE \
--docker-server=https://gcr.io \
--docker-username=_json_key \
--docker-email=user@example.com \
--docker-password="$(cat application_default_credentials.json)"

Feast Internal DNS Service expose

So the service which has been deployed in Kubernetes default namespace let’s expose them with authentication to user defined namespace eg. aniruddha-choudhury so that feast client can access the service like Redis,Kafka,Feast Core,Feast Online etc.

The motive of this section our Kubeflow pipeline will run in user namespace and feast running in default namespace.

Internal service authentication in different namespace

Run the below command:

kubectl apply -f service-expose.yaml

1.Building Feast feature set with Client connection

Data is stored in Feast using FeatureSets. A FeatureSet contains the data schema and data source information, whether it is coming from a pandas dataframe . FeatureSets are how Feast knows where to source the data it needs for a feature, how to ingest it, and some basic characteristics about the data types. Groups of features can be ingested and stored together, and feature sets provide efficient storage and logical namespacing of data within these stores.

Once our feature set is registered, Feast will start an Apache Beam job to populate the feature store with data from the source. A feature set is used to generate both offline and online feature stores, which ensures developers train and serve their model with the same data. Feast ensures that the source data complies with the expected schema of the feature set.

Four steps to ingesting feature data into Feast: create a FeatureSet, add entities and features, register the FeatureSet, and ingest feature data into the FeatureSet.

The steps to ingest features to data into Feast:

~ Connect to a Feast deployment by setting up a client with the Python SDK and create a GCP staging bucket

import os,json
from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat
import random
staging_bucket = f'gs://feast-staging-bucket-{random.randint(1000000, 10000000)}/'
!gsutil mb {staging_bucket}
staging_bucket
client = Client(core_url="feast-release-feast-core.aniruddha-choudhury.svc.cluster.local:6565",
serving_url="feast-release-feast-online-serving.aniruddha-choudhury.svc.cluster.local:6566",
spark_launcher="k8s",
spark_staging_location=staging_bucket,
spark_k8s_namespace="spark-operator",
redis_host="feast-release-redis-headless.aniruddha-choudhury.svc.cluster.local:6379",
historical_feature_output_location=f"{staging_bucket}historical",
)

~Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. Entities are used as keys to look up feature values and are used to join features between different feature sets when creating datasets for training or serving. The entity serves as an identifier for whatever relevant characteristic you have in the dataset. It is an object that can be modeled and store information.

driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.FLOAT)
conv_rate = Feature("conv_rate", ValueType.FLOAT)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32)
fare_amount = Feature("fare_amount", ValueType.INT32)
sales_status = Feature("sales_status", ValueType.INT32)

~Register the FeatureSet. This creates a named feature set within Feast. The feature set contains no feature data.Feature tables are stored on the gcp bucket system, and is accessible from the Spark cluster,batch_source defines where the historical features are stored. It is also possible to have an optional stream_source, which the feature values are delivered continuously.

# Offline data will be stored in this location
demo_data_location = os.path.join(os.getenv(“FEAST_SPARK_STAGING_LOCATION”, staging_bucket), “test_data”)
driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics")driver_statistics = FeatureTable(
name = "driver_statistics",
entities = ["driver_id"],
features = [
acc_rate,
conv_rate,
avg_daily_trips,
fare_amount,
sales_status
],
batch_source=FileSource(
event_timestamp_column="datetime",
created_timestamp_column="created",
file_format=ParquetFormat(),
file_url=driver_statistics_source_uri,
date_partition_column="date"
)
)

~Registering entities and feature tables in Feast Core

client.apply(driver_id)
client.apply(driver_statistics)
print(client.get_feature_table("driver_statistics").to_yaml())

~Populating batch source

For both batch and stream sources, the following configurations are necessary:

  • Event timestamp column: Name of column containing timestamp when event data occurred. Used during point-in-time join of feature values to entity timestamps.

We can ingest the dataframe feature data into Feast using client.ingest(..).

Dataframe
client.ingest(driver_statistics, stats_df)

2. Building Feast pipeline training and serving with Kubeflow

Feature stores work because they decouple feature engineering from feature usage, allowing feature development and creation to occur independently from the consumption of features during model development. As features are added to the feature store, they become available immediately for both training and serving and are stored in a single location. This ensures consistency between model training and serving.

For example, a model served as a customer-facing application may receive only 10 input values from a client, but those 10 inputs may need to be transformed into many more features via feature engineering before being sent to a model. Those engineered features are maintained within the feature store. It is crucial that the pipeline for retrieving features during development is the same as when serving the model. A feature store ensures that consistency

Feast accomplishes this by using Beam on the backend for feature ingestion pipelines that write feature values into the feature sets, and uses Redis and BigQuery for online and offline (respectively) feature retrieval. As with any feature store, the ingestion pipeline also handles partial failure or race conditions that might cause some data to be in one storage but not the other.

Feast Kubeflow Architecture

2.1 RETRIEVING DATA FROM FEAST

For training a model, historical feature retrieval is backed by gcs bucket and accessed using .get_batch_features(...)with the batch serving client. In this case, we provide Feast with a pandas dataframe containing the entities and timestamps that feature data will be joined to. This allows Feast to produce a point-in-time correct dataset based on the features that have been requested.

import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse
import os,json
from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat
features={"Features":[
"driver_statistics:avg_daily_trips",
"driver_statistics:conv_rate",
"driver_statistics:acc_rate",
"driver_statistics:fare_amount",
"driver_statistics:sales_status"
]}
entity_bucket="gs://feast_data_source/data/entity.csv"
entities_with_timestamp=pd.read_csv(entity_bucket)
entities_with_timestamp["event_timestamp"]=pd.to_datetime(entities_with_timestamp["event_timestamp"])client = Client(
core_url="feast-release-feast-core.aniruddha- choudhury.svc.cluster.local:6565",
serving_url="feast-release-feast-online-serving.aniruddha-choudhury.svc.cluster.local:6566",
spark_launcher="k8s",
spark_staging_location=staging_bucket,
spark_k8s_namespace="spark-operator",
redis_host="feast-release-redis-headless.aniruddha-choudhury.svc.cluster.local:6379", historical_feature_output_location=f"{staging_bucket}historical")
job =
client.get_historical_features(feature_refs=features['Features'] ,entity_source= entities.stage_entities_to_fs(entity_source=entities_with_timestamp, staging_location=staging_bucket,config= Config))
# get_output_file_uri will block until the Spark job is completed.output_file_uri = job.get_output_file_uri()
features_outcome=read_parquet(output_file_uri)
Historical Features

2.2 KUBEFLOW PIPELINE

The below is the Feast Kubeflow component service for retrieval of Historical features for training

import kfp.dsl as dsl
import yaml
from kubernetes import client as k8s
import kfp.gcp as gcp
from kfp import components
from string import Template
import json
from kubernetes import client as k8s_client
@dsl.pipeline(
name='Feast Service pipeline',
description='End to End pipeline for Feast training and serving'
)
def feast_training_pipeline( feast_step_image="gcr.io/project_id/feast_kube/apachespark_job:v1",
trainmodel_step_image="gcr.io/project_id/feast_kube/train_job:v1",
evaluator_step_image="gcr.io/project_id/feast_kube/evals_job:v1",
json_bucket="gs://feast_data_source/data/features.json",
entity_bucket="gs://feast_data_source/data/entity.csv",
staging_bucket="gs://feast-staging-bucket-3670014/",
split_size=0.2,
epochs=5,
learning_rate=.001,
batch_size=64,
shuffle_size=1000,
target_name="driver_statistics__sales_status",
tensorboard_gcs_logs="gs://feast_data_source/taxi/logs",
gcs_path="gs://feast_data_source/taxi/model", gcs_path_confusion="gs://feast_data_source/taxi/",
mode="gcs",
probability=0.5,
serving_name="kfserving-feast-serving",
serving_namespace="default",
image="gcr.io/<project_id/feast_kubeflow/serving_job:v1"

):
"""
Pipeline
"""
# PVC : PersistentVolumeClaim volume
vop = dsl.VolumeOp(
name='my-pvc',
resource_name="my-pvc",
modes=dsl.VOLUME_MODE_RWO,
size="1Gi"
)
# feast
feast_step = dsl.ContainerOp(
name='Feast Service',
image=feast_step_image,
command="python",
arguments=[
"/app/feast_service.py",
"--staging-bucket", staging_bucket,
"--json-bucket", json_bucket,
"--entity-bucket", entity_bucket,
"--target-name",target_name
],
pvolumes={"/mnt": vop.volume}
).apply(gcp.use_gcp_secret("user-gcp-sa"))
#trainmodel
train_model_step = dsl.ContainerOp(
name='train_model',
image=trainmodel_step_image,
command="python",
arguments=[
"/app/train.py",
"--epochs",epochs,
"--batch-size",batch_size,
"--learning-rate",learning_rate,
"--tensorboard-gcs-logs",tensorboard_gcs_logs,
"--gcs-path", gcs_path,
"--mode", mode,

],
file_outputs={"mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json"
},
pvolumes={"/mnt": feast_step.pvolume}
).apply(gcp.use_gcp_secret("user-gcp-sa"))

#evaluationmodel
evaluation_model_step = dsl.ContainerOp(
name='evaluation_model',
image=evaluator_step_image,
command="python",
arguments=[
"/app/evaluator.py",
"--probability",probability,
"--gcs-path", gcs_path,
"--gcs-path-confusion", gcs_path_confusion,

],file_outputs={"mlpipeline-metrics":"/mlpipeline-metrics.json","mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json"
},
pvolumes={"/mnt": train_model_step.pvolume}
).apply(gcp.use_gcp_secret("user-gcp-sa"))
kfserving_template = Template("""{
"apiVersion": "serving.kubeflow.org/v1alpha2",
"kind": "InferenceService",
"metadata": {
"labels": {
"controller-tools.k8s.io": "1.0"
},
"name": "$name",
"namespace": "$namespace"
},
"spec": {
"default": {
"predictor": {
"custom": {
"container": {
"image": "$image"
}
}
}
}
}
}""")
kfservingjson = kfserving_template.substitute({ 'name': str(serving_name),
'namespace': str(serving_namespace),
'image': str(image)})
kfservingdeployment = json.loads(kfservingjson)serve = dsl.ResourceOp(
name="serve",
k8s_resource=kfservingdeployment,
action="apply",
success_condition="status.url"
)
serve.after(evaluation_model_step)


if __name__ == '__main__':
import kfp.compiler as compiler
pipeline_func = feast_training_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func,pipeline_filename)
  • @dsl.pipeline is a required decoration including the name and description properties.
Kubeflow Pipeline

2.3 FEAST PODS AND SERVICE MONITORING

Feast Components export metrics that can provide insight into Feast behavior. Feast Ingestion Job can be configured to push Ingestion metrics to a StatsD instance. Metrics export to StatsD for Ingestion Job is configured in Job Controller’s

kubectl port-forward — namespace knative-monitoring $(kubectl get pod — namespace knative-monitoring — selector=”app=grafana” — output jsonpath=’{.items[0].metadata.name}’) 8080:3000

Connect to Grafana and check the metrics and telemetry. Ingestion Metrics in Prometheus or some other metrics backend, use a metrics forwarder to forward Ingestion Metrics from StatsD to the metrics backend of choice. (ie Use prometheus-statsd-exporter to forward metrics to Prometheus.

**Conclusion**

This post demonstrates the open source feature store for machine learning developed by Google Cloud and Gojek. It is built around Google Cloud services using Big Query for offline model training and Redis for low-latency, online serving . Apache Beam is used for feature creation, which allows for consistent data pipelines for both batch and streaming. And end to end steps. And soon will be sharing the Github Link with whole setup.

**Further Reference**

https://docs.feast.dev/

Author

Aniruddha Choudhury
Senior DataScientist

Feel free to reach out to me at Github Linkedin Email Website