derrickaw commented on code in PR #35568:
URL: https://github.com/apache/beam/pull/35568#discussion_r2208324854


##########
sdks/python/apache_beam/yaml/examples/transforms/ml/taxi-fare/custom_nyc_taxifare_model_deployment.ipynb:
##########
@@ -0,0 +1,809 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "source": [
+    "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 
2.0 (the \"License\")\n",
+    "\n",
+    "# Licensed to the Apache Software Foundation (ASF) under one\n",
+    "# or more contributor license agreements. See the NOTICE file\n",
+    "# distributed with this work for additional information\n",
+    "# regarding copyright ownership. The ASF licenses this file\n",
+    "# to you under the Apache License, Version 2.0 (the\n",
+    "# \"License\"); you may not use this file except in compliance\n",
+    "# with the License. You may obtain a copy of the License at\n",
+    "#\n",
+    "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "#\n",
+    "# Unless required by applicable law or agreed to in writing,\n",
+    "# software distributed under the License is distributed on an\n",
+    "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "# KIND, either express or implied. See the License for the\n",
+    "# specific language governing permissions and limitations\n",
+    "# under the License"
+   ],
+   "metadata": {
+    "id": "ZpDmaAwXuRnG"
+   },
+   "id": "ZpDmaAwXuRnG",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "# NYC Taxi Fare Prediction - Model Training and Deployment\n",
+    "\n",
+    "<table><tbody><tr>\n",
+    "  <td style=\"text-align: center\">\n",
+    "    <a 
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi-fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\";>\n",
+    "      <img alt=\"Google Cloud Colab Enterprise logo\" 
src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\";
 width=\"32px\"><br> Run in Colab Enterprise\n",
+    "    </a>\n",
+    "  </td>\n",
+    "  <td style=\"text-align: center\">\n",
+    "    <a 
href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi-fare/custom_nyc_taxifare_model_deployment.ipynb\";>\n",
+    "      <img alt=\"GitHub logo\" 
src=\"https://github.githubassets.com/assets/GitHub-Mark-ea2971cee799.png\"; 
width=\"32px\"><br> View on GitHub\n",
+    "    </a>\n",
+    "  </td>\n",
+    "</tr></tbody></table>\n"
+   ],
+   "metadata": {
+    "id": "m916RPCn0NSS"
+   },
+   "id": "m916RPCn0NSS"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Overview\n",
+    "\n",
+    "This notebook demonstrates the training and deployment of a custom 
tabular regression model for online prediction.\n",
+    "\n",
+    "We'll train a [gradient-boosted decision tree (GBDT) 
model](https://en.wikipedia.org/wiki/Gradient_boosting) using 
[XGBoost](https://xgboost.readthedocs.io/en/stable/index.html) to predict the 
fare of a taxi trip in New York City, given the information such as pick-up 
date and time, pick-up location, drop-off location and passenger count. The 
dataset is from the Kaggle competition 
https://www.kaggle.com/c/new-york-city-taxi-fare-prediction organized by Google 
Cloud.\n",
+    "\n",
+    "After model training and evaluation, we'll use Vertex AI Python SDK to 
upload this custom model to Vertex AI Model Registry and deploy it to perform 
remote inferences at scale. The prefered way to run this notebook is within 
Colab Enterprise.\n",
+    "\n",
+    "## Outline\n",
+    "1. Dataset\n",
+    "\n",
+    "2. Training\n",
+    "\n",
+    "3. Evaluation\n",
+    "\n",
+    "4. Deployment\n",
+    "\n",
+    "5. Reference"
+   ],
+   "metadata": {
+    "id": "jGbLxUoraooN"
+   },
+   "id": "jGbLxUoraooN"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We first install and import the necessary libraries to run this notebook."
+   ],
+   "metadata": {
+    "id": "e6zO5wWaMhaX"
+   },
+   "id": "e6zO5wWaMhaX"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!pip3 install --quiet --upgrade \\\n",
+    "  opendatasets \\\n",
+    "  google-cloud-storage \\\n",
+    "  google-cloud-aiplatform \\\n",
+    "  scikit-learn \\\n",
+    "  xgboost \\\n",
+    "  pandas"
+   ],
+   "metadata": {
+    "id": "weUpgu9Y1OoF"
+   },
+   "id": "weUpgu9Y1OoF",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "import opendatasets as od\n",
+    "import pandas as pd\n",
+    "import random\n",
+    "import time\n",
+    "import os\n",
+    "\n",
+    "from xgboost import XGBRegressor\n",
+    "from sklearn.model_selection import train_test_split\n",
+    "from sklearn.metrics import root_mean_squared_error\n",
+    "\n",
+    "import google.cloud.storage as storage\n",
+    "import google.cloud.aiplatform as vertex"
+   ],
+   "metadata": {
+    "id": "KJTsSdQKSN_m"
+   },
+   "id": "KJTsSdQKSN_m",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Dataset\n",
+    "\n",
+    "We use the `opendatasets` library to programmatically download the 
dataset from Kaggle.\n",
+    "\n",
+    "We'll first need a Kaggle account and register for this competition. 
We'll also need the API key which is stored in `kaggle.json` file automatically 
downloaded when you create an API token. Go to *Profile* picture -> *Settings* 
-> *API* -> *Create New Token*.\n",
+    "\n",
+    "The dataset download will prompt you to enter your Kaggle username and 
key. Copy this information from `kaggle.json`.\n"
+   ],
+   "metadata": {
+    "id": "DVWcleCz1AVl"
+   },
+   "id": "DVWcleCz1AVl"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "dataset_url = 
'https://www.kaggle.com/c/new-york-city-taxi-fare-prediction'\n",
+    "od.download(dataset_url)"
+   ],
+   "metadata": {
+    "id": "8D-KUYKD1lg4"
+   },
+   "id": "8D-KUYKD1lg4",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Among the downloaded files, we will only make use of `test.csv` testing 
dataset and primarily `train.csv` training dataset for the purpose of training 
and evaluating our model."
+   ],
+   "metadata": {
+    "id": "NMCdiinpTF0W"
+   },
+   "id": "NMCdiinpTF0W"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "data_dir = 'new-york-city-taxi-fare-prediction'\n",
+    "!dir -l {data_dir}"
+   ],
+   "metadata": {
+    "id": "rmlERXShR457"
+   },
+   "id": "rmlERXShR457",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The training dataset contains approx. 55M rows. Reading the entire 
dataset into a pandas DataFrame (i.e. loading the entire dataset into memory) 
is slow and memory-consuming that can affect operations in later parts of the 
notebook. And for the purpose of experimenting with our model, it is also 
unnecessary.\n",
+    "\n",
+    "A good practice is to sample some percentages of the training dataset."
+   ],
+   "metadata": {
+    "id": "Yv9Kq0v2T_1g"
+   },
+   "id": "Yv9Kq0v2T_1g"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "p = 0.01\n",
+    "# keep the header, then take only 1% of rows\n",
+    "# if random from [0,1] interval is greater than 0.01 the row will be 
skipped\n",
+    "df_train_val = pd.read_csv(\n",
+    "    data_dir + \"/train.csv\",\n",
+    "    header=0,\n",
+    "    parse_dates = ['pickup_datetime'],\n",
+    "    skiprows=lambda i: i > 0 and random.random() > p\n",
+    ")\n",
+    "df_train_val.shape"
+   ],
+   "metadata": {
+    "id": "epJNJkp1W7P_"
+   },
+   "id": "epJNJkp1W7P_",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The training dataset, now as a DataFrame table, can be further inspected."
+   ],
+   "metadata": {
+    "id": "bzRYmrc-YDdd"
+   },
+   "id": "bzRYmrc-YDdd"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train_val.columns"
+   ],
+   "metadata": {
+    "id": "AkJ2-w3BW7dD"
+   },
+   "id": "AkJ2-w3BW7dD",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train_val.info()"
+   ],
+   "metadata": {
+    "id": "AxAMXNTiKe2D"
+   },
+   "id": "AxAMXNTiKe2D",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train_val"
+   ],
+   "metadata": {
+    "id": "4LFxT3Zec8tX"
+   },
+   "id": "4LFxT3Zec8tX",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The testing dataset is a lot smaller in size and doesn't have the 
`fare_amount` column. Likewise, we can read the dataset as a DataFrame and 
inspect the data."
+   ],
+   "metadata": {
+    "id": "xxPAnGR1ZDwf"
+   },
+   "id": "xxPAnGR1ZDwf"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_test = pd.read_csv(data_dir + \"/test.csv\", parse_dates = 
['pickup_datetime'])\n",
+    "df_test.columns"
+   ],
+   "metadata": {
+    "id": "cWBexIlsW_4u"
+   },
+   "id": "cWBexIlsW_4u",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_test"
+   ],
+   "metadata": {
+    "id": "bch6SYLxL51_"
+   },
+   "id": "bch6SYLxL51_",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We'll set aside 20% of the training data as the validation set, to 
evaluate the model on previously unseen data."
+   ],
+   "metadata": {
+    "id": "SOeJDlsCZcY0"
+   },
+   "id": "SOeJDlsCZcY0"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train, df_val = train_test_split(\n",
+    "    df_train_val,\n",
+    "    test_size=0.2,\n",
+    "    random_state=42 # set random_state to some constant so we always have 
the same training and validation data\n",
+    ")\n",
+    "\n",
+    "print(\"Training dataset's shape: \", df_train.shape)\n",
+    "print(\"Validation dataset's shape: \", df_val.shape)"
+   ],
+   "metadata": {
+    "id": "qVN1ygVGOH33"
+   },
+   "id": "qVN1ygVGOH33",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Training\n",
+    "\n",
+    "For a quick '0-to-1' model serving on Vertex AI, the model training 
process below is kept straighforward using the simple yet very effective 
[tree-based, gradient 
boosting](https://en.wikipedia.org/wiki/Gradient_boosting) algorithm. We start 
of with a simple feature engineering idea, before moving on to the actual 
training of the model using the 
[XGBoost](https://xgboost.readthedocs.io/en/stable/index.html) library.\n"
+   ],
+   "metadata": {
+    "id": "4Ov7Efuy1Gyj"
+   },
+   "id": "4Ov7Efuy1Gyj"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "### Simple Feature Engineering\n",
+    "\n",
+    "One of the columns in the dataset is the `pickup_datetime` column, which 
is of 
[datetimelike](https://pandas.pydata.org/docs/reference/api/pandas.Series.dt.html)
 type. This makes it incredibly easy for performing data analysis on 
time-series data such as this. However, ML models don't accept feature columns 
with such a custom data type that is not a number. Some sort of conversion is 
needed, and here we'll choose to break this datetime column into multiple 
feature columns.\n"
+   ],
+   "metadata": {
+    "id": "-CWgL07pNH7H"
+   },
+   "id": "-CWgL07pNH7H"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "def add_dateparts(df, col):\n",
+    "    \"\"\"\n",
+    "    This function splits the datetime column into separate column such 
as\n",
+    "    year, month, day, weekday, and hour\n",
+    "    :param df: DataFrame table to add the columns\n",
+    "    :param col: the column with datetime values\n",
+    "    :return: None\n",
+    "    \"\"\"\n",
+    "    df[col + '_year'] = df[col].dt.year\n",
+    "    df[col + '_month'] = df[col].dt.month\n",
+    "    df[col + '_day'] = df[col].dt.day\n",
+    "    df[col + '_weekday'] = df[col].dt.weekday\n",
+    "    df[col + '_hour'] = df[col].dt.hour"
+   ],
+   "metadata": {
+    "id": "O-qK4QVk1mD8"
+   },
+   "id": "O-qK4QVk1mD8",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "add_dateparts(df_train, 'pickup_datetime')\n",
+    "add_dateparts(df_val, 'pickup_datetime')\n",
+    "add_dateparts(df_test, 'pickup_datetime')"
+   ],
+   "metadata": {
+    "id": "huYKIwMvPS0H"
+   },
+   "id": "huYKIwMvPS0H",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train.info()"
+   ],
+   "metadata": {
+    "id": "ovKvngO5SKeb"
+   },
+   "id": "ovKvngO5SKeb",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df_train.head()"
+   ],
+   "metadata": {
+    "id": "YhbHtOxAP-13"
+   },
+   "id": "YhbHtOxAP-13",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "### Gradient Boosting\n",
+    "\n",
+    "Predicting taxi fare is a supervised learning, regression problem and our 
dataset is tabular. It is well-known in common literatures (_[1]_, _[2]_) that 
[gradient-boosted decision tree (GBDT) 
model](https://en.wikipedia.org/wiki/Gradient_boosting) performs very well for 
this kind of problem and dataset type.\n",
+    "\n",
+    "The input columns used for training (and subsequently for inference) will 
be the original feature columns (pick-up/drop-off longitude/latitude and the 
passenger count) from the dataset, along with the additional engineered 
features (pick-up year, month, day, etc...) that we generated above. The 
target/label column for training is the fare amount column.\n"
+   ],
+   "metadata": {
+    "id": "7IX7HX71NNz-"
+   },
+   "id": "7IX7HX71NNz-"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "input_cols = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 
'dropoff_latitude', 'passenger_count',\n",
+    "              'pickup_datetime_year', 'pickup_datetime_month', 
'pickup_datetime_day', 'pickup_datetime_weekday',\n",
+    "              'pickup_datetime_hour']\n",
+    "\n",
+    "target_cols = 'fare_amount'\n",
+    "\n",
+    "train_inputs = df_train[input_cols]\n",
+    "train_targets = df_train[target_cols]\n",
+    "\n",
+    "val_inputs = df_val[input_cols]\n",
+    "val_targets = df_val[target_cols]\n",
+    "\n",
+    "test_inputs = df_test[input_cols]"
+   ],
+   "metadata": {
+    "id": "kASu8nxeNVEV"
+   },
+   "id": "kASu8nxeNVEV",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "As noted before, we use the XGBoost library which implements the GBDT 
machine learning algorithm in a scalable, distributed manner. Specifically, 
we'll use the 
[XGBoostRegressor](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.XGBRegressor)
 API and fit the training data by the standard squared-error loss function. The 
hyperparameters are chosen simply through trial-and-error and see which one 
gives the best result."
+   ],
+   "metadata": {
+    "id": "BQnLy75K1cIF"
+   },
+   "id": "BQnLy75K1cIF"
+  },
+  {
+   "metadata": {},
+   "cell_type": "code",
+   "outputs": [],
+   "execution_count": null,
+   "source": [
+    "xgb_model = XGBRegressor(objective='reg:squarederror',\n",
+    "                         n_jobs=-1,\n",
+    "                         random_state=42,\n",
+    "                         n_estimators=500,\n",
+    "                         max_depth=5,\n",
+    "                         learning_rate=0.05,\n",
+    "                         tree_method='hist',\n",
+    "                         subsample=0.8,\n",
+    "                         colsample_bytree=0.8)"
+   ],
+   "id": "af5fefaee2f81aa1"
+  },
+  {
+   "metadata": {},
+   "cell_type": "markdown",
+   "source": "**Note**: The model should be trained on array-like dataset 
(e.g. `numpy.ndarray`), instead of pandas DataFrame or Series object. This is 
to help passing/serializing input data in the request for remote inference 
later on, and to avoid a DataFrame/array-like mismatch error such as 
[this](https://datascience.stackexchange.com/questions/63872/lime-explainer-valueerror-training-data-did-not-have-the-following-fields).",
+   "id": "c0c95d56399f0968"
+  },
+  {
+   "metadata": {},
+   "cell_type": "code",
+   "outputs": [],
+   "execution_count": null,
+   "source": "xgb_model.fit(train_inputs.values, train_targets.values)",
+   "id": "ba33a648416a5d71"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Evaluation\n",
+    "\n",
+    "A typical metric used for model evaluation is the root mean squared error 
(RMSE).\n"
+   ],
+   "metadata": {
+    "id": "4P7CaoHy1YXA"
+   },
+   "id": "4P7CaoHy1YXA"
+  },
+  {
+   "cell_type": "code",
+   "id": "4x1Yx5kvj3DDcfH3AvpnPqBd",
+   "metadata": {
+    "tags": [],
+    "id": "4x1Yx5kvj3DDcfH3AvpnPqBd"
+   },
+   "source": [
+    "def evaluate(model):\n",
+    "    \"\"\"\n",
+    "    :param model: trained model to evaluate\n",
+    "    :return: a tuple of training and validation RMSE results\n",
+    "    \"\"\"\n",
+    "    train_preds = model.predict(train_inputs)\n",
+    "    train_rmse = root_mean_squared_error(train_targets, train_preds)\n",
+    "    val_preds = model.predict(val_inputs)\n",
+    "    val_rmse = root_mean_squared_error(val_targets, val_preds)\n",
+    "\n",
+    "    return train_rmse, val_rmse\n",
+    "\n",
+    "training_rmse, validation_rmse = evaluate(xgb_model)\n",
+    "print(\"Training RMSE: \", training_rmse)\n",
+    "print(\"Validation RMSE: \", validation_rmse)"
+   ],
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We finally make use of the testing dataset by making model inference on 
this test data. The predicted label is the `predicted_fare_amount` column."
+   ],
+   "metadata": {
+    "id": "oY1cDTDQADns"
+   },
+   "id": "oY1cDTDQADns"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "test_preds = xgb_model.predict(test_inputs)\n",
+    "result_df = df_test.copy()\n",
+    "result_df['predicted_fare_amount'] = test_preds"
+   ],
+   "metadata": {
+    "id": "7Pn2lrrLX3qS"
+   },
+   "id": "7Pn2lrrLX3qS",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Deployment\n",
+    "\n",
+    "Once the model is finished training and evaluating, the next step is 
making model serving possible on Vertex AI.\n",
+    "\n",
+    "Initialize the Vertex AI SDK for Python for your project.\n"
+   ],
+   "metadata": {
+    "id": "CAPlN6Qf1nDx"
+   },
+   "id": "CAPlN6Qf1nDx"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "PROJECT_ID = \"your-project-id\"  # @param {type:\"string\"}\n",
+    "REGION = \"us-central1\"  # @param {type:\"string\"}\n",
+    "BUCKET_URI = \"gs://your-bucket-name\"  # @param {type:\"string\"}\n",
+    "\n",
+    "vertex.init(project=PROJECT_ID, location=REGION, 
staging_bucket=BUCKET_URI)\n",
+    "\n",
+    "print(f\"Project: {PROJECT_ID} | Region: {REGION}\")"
+   ],
+   "metadata": {
+    "id": "iZA3Wg3OcA5Q"
+   },
+   "id": "iZA3Wg3OcA5Q",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Save the trained model to the Google Cloud Storage bucket as model 
artifact."

Review Comment:
   ```suggestion
       "Save the trained model to the Google Cloud Storage bucket as a model 
artifact."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to