Archives 2022

Working on feature store using Python – Vertex AI Feature Store

In the previous section, we worked on a feature store for the creation and uploading of feature values using the GUI approach. In this section, we shall create another feature store, ingest values, and also fetch the values from the feature store.

We will be using the Python 3 notebook file to type commands for working on the feature store. Follow the following-mentioned steps to create a Python file and type the Python codes given in this section.

Step 1: Create a Python notebook file
Once the workbench is created, open Jupyterlab and follow the steps mentioned in Figure 9.17 to create a Python notebook file:

Figure 9.17: New launcher window of notebook

  1. Click the new launcher.
  2. Double-click the Python 3 notebook file to create one.

Step 2: Package installation
Run the following commands to install the google cloud AI platform package. (It will take a few minutes to install the packages):
USER=”–user”
!pip install {USER} google-cloud-aiplatform

Step 3: Kernel restart
Type the following commands in the next cell, to restart the kernel. (users can restart the kernel from the GUI as well):
import os
import IPython
if not os.getenv(“”):
    IPython.Application.instance().kernel.do_shutdown(True)

Step 4: Importing the installed packages
Run the following-mentioned codes in a new cell to import the required packages:
import google.cloud.aiplatform_v1
from google.cloud.aiplatform_v1.types import featurestore_service as fs_s
from google.cloud.aiplatform_v1.types import featurestore as fs
from google.cloud.aiplatform_v1.types import feature
from google.cloud.aiplatform_v1.types import entity_type
from google.cloud.aiplatform_v1.types import io
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud.aiplatform_v1.types.featurestore_service import ImportFeatureValuesRequest
from google.cloud.aiplatform_v1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1.types import featurestore_online_service
import datetime

Step 5: Setting up the project and other variables
Run the following-mentioned line of codes in a new cell to set the project to the current one and also define variables to store the path for multiple purposes:
Project_id=”vertex-ai-gcp-1”
featurestore_name=”employee_fs_pysdk”
Entity_name=”emp_entity_pysdk”
location = “us-central1”
endpoint = “us-central1-aiplatform.googleapis.com”

Step 6: Connecting to the feature store
Connection to the feature store is the first step to work on the feature store. We create a connection to the feature store through the service client to create and ingest values to it using FeaturestoreServiceClient.

FeaturestoreOnlineServingServiceClient is used to fetch the feature values from the feature store. Run the below-mentioned line of codes to complete the connection:
client_admin = google.cloud.aiplatform_v1.FeaturestoreServiceClient(client_options={“api_endpoint”: endpoint})
client_data = google.cloud.aiplatform_v1.FeaturestoreOnlineServingServiceClient(client_options={“api_endpoint”: endpoint})
fs_resource_path = client_admin.common_location_path(Project_id, location)

Step 7: Creation of feature store
Instead of using the feature store that has been created from the GUI approach, we will create a new one. Feature store name, location, and project information are already in Step 5 (Setting up the project and other variables). Run the following code to create the feature store. The status of the feature store will be displayed in the results, as shown in Figure 9.18. The feature store creation procedure is a long-running operation, they are asynchronous jobs. except PI calls like updating or removing feature stores follow the same procedure.
create_fs = client_admin.create_featurestore(
    fs_s.CreateFeaturestoreRequest(
        parent=fs_resource_path,
        featurestore_id=featurestore_name,
        featurestore=fs.Featurestore(
            online_serving_config=fs.Featurestore.OnlineServingConfig(
                fixed_node_count=1
print(create_fs.result())
client_admin.get_featurestore(name=client_admin.featurestore_path(Project_id, location, featurestore_name))

Figure 9.18: Feature store creation using Python

Entity type created successfully – Vertex AI Feature Store

Step 6: Entity type created successfully

The entity type is created successfully as shown in Figure 9.8 under the selected feature store:

Figure 9.8: Entity type created and listed on the landing page

  1. Click the newly created Entity type.

Step 7: Creation of features

Once the entity type is created, features need to be created before ingesting the values. Follow the steps mentioned in Figure 9.9 to create features:

Figure 9.9: Creation of features

  1. Click ADD FEATURES.

A new side tab will pop out to enter the features as shown in Figure 9.10, follow the below steps to create the features:

Figure 9.10: Adding user input for feature creation

  1. Enter the Feature name.
  2. Enter the Value type stored in that feature.
  3. Enter the Description for the feature.
  4. Click Add Another Feature to add new features.
  5. Click SAVE, once all the features are added.

Step 8: Features created successfully

Once the features are created successfully, they are displayed on the entity type page as shown in Figure 9.11:

Figure 9.11: Features listed under the entity type

  1. Newly created features are displayed in tabular format.
  2. Click on Ingest Values to add the feature values.

Step 9: Ingesting feature values

Follow the steps mentioned in Figure 9.12 to initiate the ingestion of feature values:

Figure 9.12: Importing data to features

  1. Data can be ingested from cloud storage or BigQuery. Select Cloud Storage CSV file.
  2. Select the CSV file from the cloud storage by clicking BROWSE.
  3. Click CONTINUE and follow the steps mentioned in Figure 9.13:

After selecting the data source, we need to map the columns of the data source to the features. Follow the steps mentioned in the Figure 9.13 to map the features:

Figure 9.13: Mapping of columns to features

  1. Add employee ID, since that is the column which is containing unique values.
  2. Select to enter the Timestamp manually. If data contains the timestamp values, the same column can be used here.
  3. Select the date and time.
  4. Map the column names in the CSV file to the features.
  5. Click INGEST to initiate the ingestion job.

Step 10: Ingestion job successful

Once the feature values are ingested successfully, the ingestion job status will be updated as shown in Figure 9.14:

Figure 9.14: Ingestion jobs of feature store

  1. The ingestion job is completed successfully.

Step 11: Landing page of feature store after the creation of feature store, entity type, and features

The landing page of the feature store is shown in Figure 9.15, all the features under entity type and feature store are listed and displayed in the tabular format:

Figure 9.15: Landing page of feature store after the creation of features

  1. Click the age feature. The window will navigate to the properties of the feature as shown in Figure 9.16:

Figure 9.16: Properties of feature

  1. For all the features, Feature Properties consisting of basic information and statistics are displayed.
  2. Metrics are populated if the monitoring feature is enabled for the feature store and for that particular feature.

Data for feature store exercise – Vertex AI Feature Store

For this exercise, data is downloaded from Kaggle (link is provided below) and the dataset is listed under CC0 public domain licenses. Data contains information regarding employee promotion data. Since we are not building any model from the data, we are considering only 5 attributes (employee ID, education, gender, no. of trainings, and age), and only 50 samples are considered.

https://www.kaggle.com/datasets/arashnic/hr-ana

feature_store_input bucket is created under us-centra1 (single region) and the CSV file is uploaded from the bucket as shown in Figure 9.2:

Figure 9.2: Data stored in cloud storage

Working on feature store using GUI

Before we ingest data to the feature store or feature, we need to create a feature store, entity, and features. Resources of the feature store can be created using GUI or Python code. Follow the below-mentioned steps to create the feature store resources using GUI:

Step 1: Opening of feature store.

The landing page of the Vertex AI is shown in Figure 9.3:

Figure 9.3: landing page of Vertex AI

  1. Click Feature Store to open.

Step 2: Landing page of feature store

Feature stores are region specific; the landing page provides information on the feature store under a specific region as shown in Figure 9.4:

Figure 9.4: Landing page of feature store

  1. Click CREATE FEATURESTORE.

The region needs to be selected in this step (region cannot be changed post this step).

Step 3: Creation of feature store

Follow the steps mentioned in Figure 9.5 to create a feature store:

Figure 9.5: Creation of feature store

  1. Provide a name for the feature store.
  2. Enable Online Serving if the features need to be made available for low-latency online serving.
  3. Since the volume of data is small, select Fixed Nodes and provide the value of 1.
  4. Click on CREATE.

Step 4: Feature store created successfully

Once the feature store is created it will be displayed on the landing page as shown in Figure 9.6:

Figure 9.6: Feature store created and listed on the landing page

  1. Newly created feature store.
  2. Click on Create Entity Type for its creation.

Step 5: Creation of entity type

Follow the steps shown in Figure 9.7 to create an entity type:

Figure 9.7: Creation of entity type

  1. Select the Region under which the feature store is created.
  2. All the feature stores under the region will be listed, select the newly created Featurestore.
  3. Provide Entity type name.
  4. Write the Description for the entity type.
  5. Feature monitoring (enable if the features need to be monitored).
  6. It enables the monitoring of feature stores and features. Feature store monitors CPU utilization, storage, and latency. Feature monitoring helps in monitoring changes in feature value distribution.
  7. Click CREATE.

Advantages of feature store – Vertex AI Feature Store

These are the advantages of feature store:

  • Extend features company-wide: Feature stores let you easily share features for training or serving. Different projects and use cases do not need feature re-engineering. Manage and deliver features from a central repository to preserve consistency throughout your business and prevent redundant efforts, especially for high-value features.

Vertex AI Feature Store lets people find and reuse features using search and filtering. View feature metadata to assess quality and usage. For instance, you may check feature coverage and feature value distribution.

  • Serving at scale: Online forecasts require low-latency feature serving, which Vertex AI Feature Store manages. Vertex AI Feature Store automatically builds and expands low-latency data serving infrastructure. You create features but outsource providing them. Data scientists may create new features without worrying about deployment using this management.
  • Reduce training-serving bias: Training-serving skew happens when your production feature data distribution differs from the one used to train your model. This skew causes disparities between a model’s training and production performance. Vertex AI Feature Store can handle training-serving bias with these examples:
    • Vertex AI Feature Store guarantees that feature values are ingested once and reused for training and serving. Without a feature store, training and serving features may use distinct code paths. Training and serving feature values may differ.
    • Vertex AI Feature Store offers previous data lookups for training. By collecting pre-prediction feature values, these lookups reduce data leakage.
  • Identify drift: Vertex AI Feature Store detects drift in feature data distribution. Vertex AI Feature Store monitors feature value dispersion. Retrain models using impacted features as feature drift rises.
  • Retention: The Vertex AI Feature Store preserves feature values for the allotted amount of time. This cap is determined by the feature values’ timestamp, not the date and time the values were imported. Values with timestamps that go beyond the limit are scheduled for deletion by Vertex AI Feature Store.

Disadvantages of feature store

A feature store has overhead, which can make data science more complicated, especially for smaller projects. A feature store may complicate matters if a business has numerous little data sets. Feature stores are ineffective when the data is so diverse that no standard modeling approach will assist. Reusing features created on separate data sources and metadata is tough. Additionally, a feature store might not be the ideal choice when the features are not time-dependent or when features are needed only for batch predictions.

Knowing Vertex AI feature store – Vertex AI Feature Store

Introduction

After learning about the pipelines of the platform, we will move to the feature store of GCP. In this chapter, we will start with an understanding of the feature store, and the advantages of features followed by a hands-on feature store.

Structure

In this chapter, we will cover the following topics:

  • Knowing Vertex AI feature store
  • Hierarchy of feature store
  • Advantages of feature store
  • Disadvantages of feature store
  • Working on feature store using GUI
  • Working on feature store using python
  • Deleting resources
  • Best practices for Feature store

Objectives

By the end of this chapter, users will have a good idea about the feature store, when to use it, and how to employ it with the web console of GCP and Python.

Knowing Vertex AI feature store

Vertex AI Feature Store is a centralized repository for managing and delivering machine learning features. To speed up the process of creating and delivering high-quality ML applications, many organizations are turning to centralized feature stores to facilitate the sharing, discovery, and re-use of ML features at scale.

The storage and processing power, as well as other components of the backend infrastructure, are handled by Vertex AI Feature Store, making it a fully managed solution. As a result of this strategy, data scientists may ignore the difficulties associated with delivering features into production and instead concentrate on the feature computation logic.

The feature store in Vertex AI is an integral aspect of the overall system. Use Vertex AI Feature Store on its own or include it in your existing Vertex AI workflows. For instance, the Vertex AI Feature Store may be queried for information to be used in the training of custom or AutoML models.

Hierarchy of feature store

The collection of entities for a certain entity time is stored in a feature store. Fields like entity ID, timestamp, and a series of attributes like feature 1, feature 2, and so on, are all defined for each entity type. The hierarchy of the feature store is described in Figure 9.1:

Figure 9.1: Hierarchy of feature store

  • Feature store: A top-level container for entity types, features, and their values.
  • Entity type: A collection of semantically related features (real or virtual).
  • Entity: An instance of the entity type.
  • Feature: A measurable property or attribute of an entity type.
  • Feature values: These contain values of the features at a specific point in time.

Pipeline artifacts stored in cloud storage – Pipelines using TensorFlow Extended

Step 10: Pipeline artifacts stored in cloud storage

Pipeline artifacts are stored in the cloud storage as shown in Figure 8.15:

  • Module folder contains Python files which we had pushed while creating transform and trainer components.
  • Output_model contains trained classification model.
  • Root folder contains artifacts of each of the component of the pipeline:

Figure 8.15: Pipeline artifacts stored in the cloud storage

Deletion of resources

We have utilized workbench, cloud storage to store the data and the artifacts of the pipeline. For deletion of resources, ensure to delete the workbench, clear the data stored in the cloud storage.

Conclusion

We learnt about the TFX, a few of its components and constructed pipeline using some of the standard components. Also, we understood how to use Kubeflow for the orchestration of TFX pipeline on vertex AI.

In the next chapter, we will start understanding and working on feature store of the Vertex AI.

Questions

  1. Which artifacts of the transform component is used in the training component of the pipeline?
  2. What are the different orchestration options TFX supports?
  3. Try using evaluator component between trainer and pusher component and re-construct the pipeline. (Use evaluator component to evaluate the model performance and push it only if the performance is good).

Creation of pipeline – Pipelines using TensorFlow Extended

Step 6: Creation of pipeline
Till now we have understood and executed certain components of the tfx pipeline (not all the components are needed for the pipeline construction). Let’s start constructing pipeline using example_gen, statistics_gen, schema_gen, transform, trainer and pusher components (Pusher component is used to push the trained model to a location, in this example trained model we will be pushed to the cloud storage):
• Define a function (_create_pipeline) to consisting of all the steps/components that needs to be included in the pipeline and returns the tfx pipeline. Run the below mentioned code in a new cell to define the pipeline function:
def _create_pipeline(pl_name, pipeline_root_folder, data_root,
module_file_transform, module_file_train, model_dir_save,
) -> tfx.dsl.Pipeline:
example_gen_csv = tfx.components.CsvExampleGen(input_base=data_root)
gen_statistics = tfx.components.StatisticsGen(examples=example_gen_csv.outputs[‘examples’])
gen_schema = tfx.components.SchemaGen(statistics=gen_statistics.outputs[‘statistics’])
transform_data = tfx.components.Transform(examples=example_gen_csv.outputs[‘examples’],schema=gen_schema.outputs[‘schema’],module_file=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH))
model_trainer = tfx.components.Trainer(
module_file=module_file_train,
examples=example_gen_csv.outputs[‘examples’],
transform_graph=transform_data.outputs[‘transform_graph’],
schema=gen_schema.outputs[‘schema’],
train_args=tfx.proto.TrainArgs(num_steps=200),
eval_args=tfx.proto.EvalArgs(num_steps=10))
pusher = tfx.components.Pusher(
model=model_trainer.outputs[‘model’],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=model_dir_save)))
return tfx.dsl.Pipeline(
pipeline_name=pl_name,
pipeline_root=pipeline_root_folder,
components=[example_gen_csv,gen_statistics,gen_schema,transform_data,model_trainer,pusher])

Step 8: Defining a runner
As mentioned in the theory section, TFX is portable across environments and orchestration frameworks. TFX supports Airflow, Beam, and Kubeflow. It also provides flexibility for the developers to add their own orchestrators. Orchestrators must inherit TfxRunner. TFX orchestrators schedule pipeline components based on DAG dependencies using the logical pipeline object, which comprises pipeline args, components, and DAG. In our example we will be using Vertex Pipelines together with the Kubeflow V2 dag runner. In the code, we create runner using Kubeflow V2 dag and run it by passing all the pipeline parameters.
Run the following mentioned codes to define the runner:
trainer_file=”trainer.py”
file_transform=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH)
file_train=os.path.join(MODULE_FOLDER, trainer_file)
pl_def_file = NAME_PIPELINE + ‘.json’
pl_runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=pl_def_file)
_ = pl_runner.run(
_create_pipeline(
pl_name=NAME_PIPELINE,
pipeline_root_folder=ROOT_PIPELINE,
data_root=INPUT_DATA_DIR,
module_file_transform=file_transform,
module_file_train=file_train,
model_dir_save=OUTPUT_MODEL_DIR))

Step 9: Pipeline execution
As the last step we need to execute the pipeline. Run the following mentioned codes in a new cell to start the pipeline execution:
import google.cloud
google.cloud.aiplatform.init(project=PROJECT_ID, location=”us-central1”)
job=google.cloud.aiplatform.pipeline_jobs.PipelineJob(template_path=pl_def_file,
display_name=NAME_PIPELINE)
job.run(sync=False)

Once the pipeline starts, users will be provided the link to check the status as shown in Figure 8.13:

Figure 8.13: Pipeline link

  1. Click on the link to check the status of the pipeline as shown in the following screenshot:

Figure 8.14: Pipeline executed successfully
Pipeline will take a few mins to complete the training and push the trained model to the cloud storage. If the pipeline is constructed using tfx or Kubeflow, the user interface of the pipeline in GCP will remain same as shown in Figure 8.14. Spend some time to understand each of the components and its artifacts in detail as described in the previous chapter.

Importing packages – Pipelines using TensorFlow Extended-2

Transform:
We will create file_transform.py which will contain information about the data labels and feature engineering steps:
• Run the following mentioned code in new cells to declare variable containing file name:
TRANSFORM_MODULE_PATH = ‘file_transform.py’

• Run the following mentioned codes in a new cell to create file_transform.py file. Preprocessing (preprocessing_fn) function is where the actual alteration of the dataset occurs. It receives and returns a tensor dictionary, where tensor refers to a Tensor or SparseTensor. In our example, we are not applying any transformations, code is just mapping to the output dictionary. (%%writefile command will create the .py file with the following code in it.):
%%writefile {TRANSFORM_MODULE_PATH}
import tensorflow as tf
import tensorflow_transform as tft
NAMES = [‘AF3’,’F7’,’F3’,’FC5’,’T7’,’P7’,’O1’,’O2’,’P8’,’T8’,’FC6’,’F4’,’F8’,’AF4’]
LABEL = ‘eyeDetection’
def preprocessing_fn(raw_inputs):
processed_data = dict()
for items in NAMES:
processed_data[items]=raw_inputs[items]
processed_data[LABEL] = raw_inputs[LABEL]
return processed_data

  1. Files needs to copy into GCS bucket, run the following line of code to copy it:
    !gsutil cp file_transform.py {MODULE_FOLDER}/
  2. Run the following mentioned lines of codes in a new cell to configure transform component. Transform component is taking inputs from the example_gen and schema_gen:
    transform_data = tfx.components.Transform(
    examples=example_gen_csv.outputs[‘examples’],schema=gen_schema.outputs[‘schema’],
    module_file=os.path.join(MODULE_FOLDER, TRANSFORM_MODULE_PATH))
    context_in.run(transform_data, enable_cache=False)

Output of the transform component is as shown in Figure 8.10. Transform graph is one of the artifacts generated by the transform component and it will be used for the trainer module.

Figure 8.10: Output of transform component
Run the following mentioned codes to check few records of the transformed data (This code will not be needed during the pipeline construction):
train_sam = os.path.join(transform_data.outputs[‘transformed_examples’].get()[0].uri,’Split-train’)
filenames_tfr = [os.path.join(train_sam, name) for name in os.listdir(train_sam)]
dataset = tf.data.TFRecordDataset(filenames_tfr, compression_type=’GZIP’)

for record in dataset.take(1):
sample = tf.train.Example()
sample.ParseFromString(record.numpy())
print(sample)

Output of 1 record will be as shown in Figure 8.11:

Figure 8.11: Transformed data example
Trainer:
The Trainer component is in charge of preparing the input data and training the model. It requires the ExampleGen examples, the transform, and the training code. TensorFlow Estimators, Keras models, or custom training loops can be used in the training code. When compared to other components, trainer requires more modifications in the code:
• Run the following mentioned code in the new cell, it will generate trainer.py file (Complete code is available in the repository). Trainer.py (training code) file will be the input for the trainer module. Trainer component generates two artifacts model (trained model itself) and modelrun which can be used for storing logs, this can be seen in Figure 8.12. The Trainer.py file contains four functions; high level description of those functions is:
o run_fn will be entry point to execute the training process
o _input_fn generates features and labels for training
o _get_serve_tf_examples_fn returns a function that parses a serialized tf.example
o _make_keras_model creates and returns the model for classification
%%writefile trainer.py
from typing import List
from absl import logging
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

COL_NAMES=[‘AF3’,’F7’,’F3’,’FC5’,’T7’,’P7’,’O1’,’O2’,’P8’,’T8’,’FC6’,’F4’,’F8’,’AF4’]
LABEL=”eyeDetection”
BATCH_SIZE_TRAIN = 40
BATCH_SIZE_EVAL = 20
def _input_fn(files,accessor,transform_output,size) -> tf.data.Dataset:
Creates datasets and apply transformations on them and return.
Refer the repository for code block of the function
return dataset.map(apply_transform_fn).repeat()
def _get_serve_tf_examples_fn(model, transform_output):
To parse the serialized examples and return.
Refer the repository for code block of the function
return serve_tf_examples_fn
def _make_keras_model() -> tf.keras.Model:
Create model with layers, loss functions to be used and return.
Refer the repository for code block of the function
return model_classification

def run_fn(fn_args: tfx.components.FnArgs):
tf_transform = tft.TFTransformOutput(fn_args.transform_output)
train_samples = _input_fn(_args.train_files, _args.data_accessor, tf_transform, =BATCH_SIZE_TRAIN)
eval_samples = _input_fn(_args.eval_files, _args.data_accessor, tf_transform, =BATCH_SIZE_EVAL)
model_classification = _make_keras_model()
model_classification.fit(
train_samples,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_samples,
validation_steps=fn_args.eval_steps)
sign = {
“serving_default”: _get_serve_tf_examples_fn(model_classification, tf_transform),
}
model_classification.save(fn_args.serving_model_dir, save_format=’tf’,signatures=sign)

  1. Run the following code to copy the trainer.py to GCS storage:
    !gsutil cp trainer.py {MODULE_FOLDER}/
  2. Run the following code to initiate the trainer component. trainer_file=”trainer.py”:
    trainer_file_path=os.path.join(MODULE_FOLDER, trainer_file)
    model_trainer = tfx.components.Trainer(
    examples=example_gen_csv.outputs[“examples”],
    transform_graph=transform_data.outputs[“transform_graph”],
    train_args=tfx.proto.TrainArgs(num_steps=200),
    eval_args=tfx.proto.EvalArgs(num_steps=10),
    module_file=trainer_file_path,
    )
    context_in.run(model_trainer, enable_cache=False)

The output of the trainer component is as shown in the following figure:

Figure 8.12: Output of the trainer component