derrickaw commented on code in PR #35568: URL: https://github.com/apache/beam/pull/35568#discussion_r2208324854
########## sdks/python/apache_beam/yaml/examples/transforms/ml/taxi-fare/custom_nyc_taxifare_model_deployment.ipynb: ########## @@ -0,0 +1,809 @@ +{ + "cells": [ + { + "cell_type": "code", + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ], + "metadata": { + "id": "ZpDmaAwXuRnG" + }, + "id": "ZpDmaAwXuRnG", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# NYC Taxi Fare Prediction - Model Training and Deployment\n", + "\n", + "<table><tbody><tr>\n", + " <td style=\"text-align: center\">\n", + " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi-fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\">\n", + " <img alt=\"Google Cloud Colab Enterprise logo\" src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\" width=\"32px\"><br> Run in Colab Enterprise\n", + " </a>\n", + " </td>\n", + " <td style=\"text-align: center\">\n", + " <a href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi-fare/custom_nyc_taxifare_model_deployment.ipynb\">\n", + " <img alt=\"GitHub logo\" src=\"https://github.githubassets.com/assets/GitHub-Mark-ea2971cee799.png\" width=\"32px\"><br> View on GitHub\n", + " </a>\n", + " </td>\n", + "</tr></tbody></table>\n" + ], + "metadata": { + "id": "m916RPCn0NSS" + }, + "id": "m916RPCn0NSS" + }, + { + "cell_type": "markdown", + "source": [ + "## Overview\n", + "\n", + "This notebook demonstrates the training and deployment of a custom tabular regression model for online prediction.\n", + "\n", + "We'll train a [gradient-boosted decision tree (GBDT) model](https://en.wikipedia.org/wiki/Gradient_boosting) using [XGBoost](https://xgboost.readthedocs.io/en/stable/index.html) to predict the fare of a taxi trip in New York City, given the information such as pick-up date and time, pick-up location, drop-off location and passenger count. The dataset is from the Kaggle competition https://www.kaggle.com/c/new-york-city-taxi-fare-prediction organized by Google Cloud.\n", + "\n", + "After model training and evaluation, we'll use Vertex AI Python SDK to upload this custom model to Vertex AI Model Registry and deploy it to perform remote inferences at scale. The prefered way to run this notebook is within Colab Enterprise.\n", + "\n", + "## Outline\n", + "1. Dataset\n", + "\n", + "2. Training\n", + "\n", + "3. Evaluation\n", + "\n", + "4. Deployment\n", + "\n", + "5. Reference" + ], + "metadata": { + "id": "jGbLxUoraooN" + }, + "id": "jGbLxUoraooN" + }, + { + "cell_type": "markdown", + "source": [ + "We first install and import the necessary libraries to run this notebook." + ], + "metadata": { + "id": "e6zO5wWaMhaX" + }, + "id": "e6zO5wWaMhaX" + }, + { + "cell_type": "code", + "source": [ + "!pip3 install --quiet --upgrade \\\n", + " opendatasets \\\n", + " google-cloud-storage \\\n", + " google-cloud-aiplatform \\\n", + " scikit-learn \\\n", + " xgboost \\\n", + " pandas" + ], + "metadata": { + "id": "weUpgu9Y1OoF" + }, + "id": "weUpgu9Y1OoF", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "import opendatasets as od\n", + "import pandas as pd\n", + "import random\n", + "import time\n", + "import os\n", + "\n", + "from xgboost import XGBRegressor\n", + "from sklearn.model_selection import train_test_split\n", + "from sklearn.metrics import root_mean_squared_error\n", + "\n", + "import google.cloud.storage as storage\n", + "import google.cloud.aiplatform as vertex" + ], + "metadata": { + "id": "KJTsSdQKSN_m" + }, + "id": "KJTsSdQKSN_m", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Dataset\n", + "\n", + "We use the `opendatasets` library to programmatically download the dataset from Kaggle.\n", + "\n", + "We'll first need a Kaggle account and register for this competition. We'll also need the API key which is stored in `kaggle.json` file automatically downloaded when you create an API token. Go to *Profile* picture -> *Settings* -> *API* -> *Create New Token*.\n", + "\n", + "The dataset download will prompt you to enter your Kaggle username and key. Copy this information from `kaggle.json`.\n" + ], + "metadata": { + "id": "DVWcleCz1AVl" + }, + "id": "DVWcleCz1AVl" + }, + { + "cell_type": "code", + "source": [ + "dataset_url = 'https://www.kaggle.com/c/new-york-city-taxi-fare-prediction'\n", + "od.download(dataset_url)" + ], + "metadata": { + "id": "8D-KUYKD1lg4" + }, + "id": "8D-KUYKD1lg4", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Among the downloaded files, we will only make use of `test.csv` testing dataset and primarily `train.csv` training dataset for the purpose of training and evaluating our model." + ], + "metadata": { + "id": "NMCdiinpTF0W" + }, + "id": "NMCdiinpTF0W" + }, + { + "cell_type": "code", + "source": [ + "data_dir = 'new-york-city-taxi-fare-prediction'\n", + "!dir -l {data_dir}" + ], + "metadata": { + "id": "rmlERXShR457" + }, + "id": "rmlERXShR457", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "The training dataset contains approx. 55M rows. Reading the entire dataset into a pandas DataFrame (i.e. loading the entire dataset into memory) is slow and memory-consuming that can affect operations in later parts of the notebook. And for the purpose of experimenting with our model, it is also unnecessary.\n", + "\n", + "A good practice is to sample some percentages of the training dataset." + ], + "metadata": { + "id": "Yv9Kq0v2T_1g" + }, + "id": "Yv9Kq0v2T_1g" + }, + { + "cell_type": "code", + "source": [ + "p = 0.01\n", + "# keep the header, then take only 1% of rows\n", + "# if random from [0,1] interval is greater than 0.01 the row will be skipped\n", + "df_train_val = pd.read_csv(\n", + " data_dir + \"/train.csv\",\n", + " header=0,\n", + " parse_dates = ['pickup_datetime'],\n", + " skiprows=lambda i: i > 0 and random.random() > p\n", + ")\n", + "df_train_val.shape" + ], + "metadata": { + "id": "epJNJkp1W7P_" + }, + "id": "epJNJkp1W7P_", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "The training dataset, now as a DataFrame table, can be further inspected." + ], + "metadata": { + "id": "bzRYmrc-YDdd" + }, + "id": "bzRYmrc-YDdd" + }, + { + "cell_type": "code", + "source": [ + "df_train_val.columns" + ], + "metadata": { + "id": "AkJ2-w3BW7dD" + }, + "id": "AkJ2-w3BW7dD", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "df_train_val.info()" + ], + "metadata": { + "id": "AxAMXNTiKe2D" + }, + "id": "AxAMXNTiKe2D", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "df_train_val" + ], + "metadata": { + "id": "4LFxT3Zec8tX" + }, + "id": "4LFxT3Zec8tX", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "The testing dataset is a lot smaller in size and doesn't have the `fare_amount` column. Likewise, we can read the dataset as a DataFrame and inspect the data." + ], + "metadata": { + "id": "xxPAnGR1ZDwf" + }, + "id": "xxPAnGR1ZDwf" + }, + { + "cell_type": "code", + "source": [ + "df_test = pd.read_csv(data_dir + \"/test.csv\", parse_dates = ['pickup_datetime'])\n", + "df_test.columns" + ], + "metadata": { + "id": "cWBexIlsW_4u" + }, + "id": "cWBexIlsW_4u", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "df_test" + ], + "metadata": { + "id": "bch6SYLxL51_" + }, + "id": "bch6SYLxL51_", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "We'll set aside 20% of the training data as the validation set, to evaluate the model on previously unseen data." + ], + "metadata": { + "id": "SOeJDlsCZcY0" + }, + "id": "SOeJDlsCZcY0" + }, + { + "cell_type": "code", + "source": [ + "df_train, df_val = train_test_split(\n", + " df_train_val,\n", + " test_size=0.2,\n", + " random_state=42 # set random_state to some constant so we always have the same training and validation data\n", + ")\n", + "\n", + "print(\"Training dataset's shape: \", df_train.shape)\n", + "print(\"Validation dataset's shape: \", df_val.shape)" + ], + "metadata": { + "id": "qVN1ygVGOH33" + }, + "id": "qVN1ygVGOH33", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Training\n", + "\n", + "For a quick '0-to-1' model serving on Vertex AI, the model training process below is kept straighforward using the simple yet very effective [tree-based, gradient boosting](https://en.wikipedia.org/wiki/Gradient_boosting) algorithm. We start of with a simple feature engineering idea, before moving on to the actual training of the model using the [XGBoost](https://xgboost.readthedocs.io/en/stable/index.html) library.\n" + ], + "metadata": { + "id": "4Ov7Efuy1Gyj" + }, + "id": "4Ov7Efuy1Gyj" + }, + { + "cell_type": "markdown", + "source": [ + "### Simple Feature Engineering\n", + "\n", + "One of the columns in the dataset is the `pickup_datetime` column, which is of [datetimelike](https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.html) type. This makes it incredibly easy for performing data analysis on time-series data such as this. However, ML models don't accept feature columns with such a custom data type that is not a number. Some sort of conversion is needed, and here we'll choose to break this datetime column into multiple feature columns.\n" + ], + "metadata": { + "id": "-CWgL07pNH7H" + }, + "id": "-CWgL07pNH7H" + }, + { + "cell_type": "code", + "source": [ + "def add_dateparts(df, col):\n", + " \"\"\"\n", + " This function splits the datetime column into separate column such as\n", + " year, month, day, weekday, and hour\n", + " :param df: DataFrame table to add the columns\n", + " :param col: the column with datetime values\n", + " :return: None\n", + " \"\"\"\n", + " df[col + '_year'] = df[col].dt.year\n", + " df[col + '_month'] = df[col].dt.month\n", + " df[col + '_day'] = df[col].dt.day\n", + " df[col + '_weekday'] = df[col].dt.weekday\n", + " df[col + '_hour'] = df[col].dt.hour" + ], + "metadata": { + "id": "O-qK4QVk1mD8" + }, + "id": "O-qK4QVk1mD8", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "add_dateparts(df_train, 'pickup_datetime')\n", + "add_dateparts(df_val, 'pickup_datetime')\n", + "add_dateparts(df_test, 'pickup_datetime')" + ], + "metadata": { + "id": "huYKIwMvPS0H" + }, + "id": "huYKIwMvPS0H", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "df_train.info()" + ], + "metadata": { + "id": "ovKvngO5SKeb" + }, + "id": "ovKvngO5SKeb", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "df_train.head()" + ], + "metadata": { + "id": "YhbHtOxAP-13" + }, + "id": "YhbHtOxAP-13", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Gradient Boosting\n", + "\n", + "Predicting taxi fare is a supervised learning, regression problem and our dataset is tabular. It is well-known in common literatures (_[1]_, _[2]_) that [gradient-boosted decision tree (GBDT) model](https://en.wikipedia.org/wiki/Gradient_boosting) performs very well for this kind of problem and dataset type.\n", + "\n", + "The input columns used for training (and subsequently for inference) will be the original feature columns (pick-up/drop-off longitude/latitude and the passenger count) from the dataset, along with the additional engineered features (pick-up year, month, day, etc...) that we generated above. The target/label column for training is the fare amount column.\n" + ], + "metadata": { + "id": "7IX7HX71NNz-" + }, + "id": "7IX7HX71NNz-" + }, + { + "cell_type": "code", + "source": [ + "input_cols = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count',\n", + " 'pickup_datetime_year', 'pickup_datetime_month', 'pickup_datetime_day', 'pickup_datetime_weekday',\n", + " 'pickup_datetime_hour']\n", + "\n", + "target_cols = 'fare_amount'\n", + "\n", + "train_inputs = df_train[input_cols]\n", + "train_targets = df_train[target_cols]\n", + "\n", + "val_inputs = df_val[input_cols]\n", + "val_targets = df_val[target_cols]\n", + "\n", + "test_inputs = df_test[input_cols]" + ], + "metadata": { + "id": "kASu8nxeNVEV" + }, + "id": "kASu8nxeNVEV", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "As noted before, we use the XGBoost library which implements the GBDT machine learning algorithm in a scalable, distributed manner. Specifically, we'll use the [XGBoostRegressor](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.XGBRegressor) API and fit the training data by the standard squared-error loss function. The hyperparameters are chosen simply through trial-and-error and see which one gives the best result." + ], + "metadata": { + "id": "BQnLy75K1cIF" + }, + "id": "BQnLy75K1cIF" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "xgb_model = XGBRegressor(objective='reg:squarederror',\n", + " n_jobs=-1,\n", + " random_state=42,\n", + " n_estimators=500,\n", + " max_depth=5,\n", + " learning_rate=0.05,\n", + " tree_method='hist',\n", + " subsample=0.8,\n", + " colsample_bytree=0.8)" + ], + "id": "af5fefaee2f81aa1" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "**Note**: The model should be trained on array-like dataset (e.g. `numpy.ndarray`), instead of pandas DataFrame or Series object. This is to help passing/serializing input data in the request for remote inference later on, and to avoid a DataFrame/array-like mismatch error such as [this](https://datascience.stackexchange.com/questions/63872/lime-explainer-valueerror-training-data-did-not-have-the-following-fields).", + "id": "c0c95d56399f0968" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": "xgb_model.fit(train_inputs.values, train_targets.values)", + "id": "ba33a648416a5d71" + }, + { + "cell_type": "markdown", + "source": [ + "## Evaluation\n", + "\n", + "A typical metric used for model evaluation is the root mean squared error (RMSE).\n" + ], + "metadata": { + "id": "4P7CaoHy1YXA" + }, + "id": "4P7CaoHy1YXA" + }, + { + "cell_type": "code", + "id": "4x1Yx5kvj3DDcfH3AvpnPqBd", + "metadata": { + "tags": [], + "id": "4x1Yx5kvj3DDcfH3AvpnPqBd" + }, + "source": [ + "def evaluate(model):\n", + " \"\"\"\n", + " :param model: trained model to evaluate\n", + " :return: a tuple of training and validation RMSE results\n", + " \"\"\"\n", + " train_preds = model.predict(train_inputs)\n", + " train_rmse = root_mean_squared_error(train_targets, train_preds)\n", + " val_preds = model.predict(val_inputs)\n", + " val_rmse = root_mean_squared_error(val_targets, val_preds)\n", + "\n", + " return train_rmse, val_rmse\n", + "\n", + "training_rmse, validation_rmse = evaluate(xgb_model)\n", + "print(\"Training RMSE: \", training_rmse)\n", + "print(\"Validation RMSE: \", validation_rmse)" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "We finally make use of the testing dataset by making model inference on this test data. The predicted label is the `predicted_fare_amount` column." + ], + "metadata": { + "id": "oY1cDTDQADns" + }, + "id": "oY1cDTDQADns" + }, + { + "cell_type": "code", + "source": [ + "test_preds = xgb_model.predict(test_inputs)\n", + "result_df = df_test.copy()\n", + "result_df['predicted_fare_amount'] = test_preds" + ], + "metadata": { + "id": "7Pn2lrrLX3qS" + }, + "id": "7Pn2lrrLX3qS", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Deployment\n", + "\n", + "Once the model is finished training and evaluating, the next step is making model serving possible on Vertex AI.\n", + "\n", + "Initialize the Vertex AI SDK for Python for your project.\n" + ], + "metadata": { + "id": "CAPlN6Qf1nDx" + }, + "id": "CAPlN6Qf1nDx" + }, + { + "cell_type": "code", + "source": [ + "PROJECT_ID = \"your-project-id\" # @param {type:\"string\"}\n", + "REGION = \"us-central1\" # @param {type:\"string\"}\n", + "BUCKET_URI = \"gs://your-bucket-name\" # @param {type:\"string\"}\n", + "\n", + "vertex.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)\n", + "\n", + "print(f\"Project: {PROJECT_ID} | Region: {REGION}\")" + ], + "metadata": { + "id": "iZA3Wg3OcA5Q" + }, + "id": "iZA3Wg3OcA5Q", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Save the trained model to the Google Cloud Storage bucket as model artifact." Review Comment: ```suggestion "Save the trained model to the Google Cloud Storage bucket as a model artifact." ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org