[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-01 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r960524940


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +112,170 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. convert entire dataframe into single numpy array via df.to_numpy(), or 
user can use
+   `pyspark.sql.functions.array()` to transform the input into a 
single-col tensor first.
+4. pass thru dataframe column => model input as an (ordered) dictionary of 
numpy arrays.
+
+Parameters
+--
+predict_batch_fn : Callable
+Function which is responsible for loading a model and returning a 
`predict` function.
+return_type : DataType
+Spark SQL datatype for the expected output.
+Default: ArrayType(FloatType())
+batch_size : int
+Batch size to use for inference, note that this is typically a 
limitation of the model
+and/or the hardware resources and is usually smaller than the Spark 
partition size.
+Default: -1, which sends the entire Spark partition to the model.
+input_names: list[str]
+Optional list of input names which will be used to map DataFrame 
column names to model
+input names.  The order of names must match the order of the selected 
DataFrame columns.
+If provided, the `predict()` function will be passed a dictionary of 
named inputs.
+input_tensor_shapes: list[list[int]]
+Optional list of input tensor shapes for models with tensor inputs.  
Each tensor
+input must be represented as a single DataFrame column containing a 
flattened 1-D array.
+The order of the tensor shapes must match the order of the selected 
DataFrame columns.
+Tabular datasets with scalar-valued columns should not supply this 
argument.
+
+Returns
+---
+A pandas_udf for predicting a batch.
+"""
+# generate a new uuid each time this is invoked on the driver to 
invalidate executor-side cache.
+model_uuid = uuid.uuid4()
+
+def predict(data: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
+import pyspark.ml.executor_globals as exec_global
+
+if exec_global.predict_fn and exec_global.model_uuid == model_uuid:
+predict_fn = exec_global.predict_fn
+else:
+predict_fn = predict_batch_fn(**kwargs)
+exec_global.predict_fn

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-01 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r960529530


##
python/pyspark/ml/executor_globals.py:
##
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Module to hold globals for python processes on executors
+from typing import Callable, Optional
+from uuid import UUID
+
+
+model_uuid: Optional[UUID] = None
+predict_fn: Optional[Callable] = None

Review Comment:
   This is not the correct way of building cache:
   It is not thread safe. Many concurrent spark tasks (might comes from 
different spark job) might be spawn on executor side.
   
   To cache model, you can copy mlflow code of:
   
https://github.com/mlflow/mlflow/blob/master/mlflow/pyfunc/spark_model_cache.py
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2023-01-09 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1064640372


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +138,605 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-29 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008791685


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):

Review Comment:
   Using `data.groupby` is not performant. Why not using a simple loop ? like
   
   ```
   index = 0
   data_size = len(data)
   while index < data_size:
  yield data.iloc[index: index+batch_size]
  index += batch_size
   ```



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_s

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008825577


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008798333


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008798333


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008798333


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008845792


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-10-30 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1008845792


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-01 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r108516


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-01 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r108516


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-01 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r109299


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-03 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1012905955


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013946849


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size

Review Comment:
   The code snippet occurs in 2 places, let's make it as a function 
`_convert_pandas_df_to_batch_iter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013951701


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013952829


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |
+| multi-col tensor   | N/A  | 4,2 |
+
+Notes:
+1. pass thru dataframe column => model input as single numpy array.
+2. reshape flattened tensors into expected tensor shapes.
+3. user must use `pyspark.sql.functions.struct()` or 
`pyspark.sql.functions.array()` to
+   combine multiple input columns into the equivalent of a single-col 
tensor.
+4. pass thru dataframe column => model input as an ordered list of numpy 
arrays.
+
+Example (single-col tensor):
+
+Input DataFrame has a single column with a flattened tensor value, 
represented as an array of
+float.
+```
+from pyspark.ml.functions import predict_batch_udf
+
+def predict_batch_fn():
+# load/init happens once per python worker
+import tensorflow as tf
+model = tf.keras.models.load_model('/path/to/mnist_model')
+
+# predict on batches of tasks/partitions, using cached model
+def predict(inputs: np.ndarray) -> np.ndarray:
+# inputs.shape = [batch_size, 784]
+# outputs.shape = [batch_size, 10], return_type = 
ArrayType(FloatType())
+return model.predict(inputs)
+
+return predict
+
+mnist = predict_batch_udf(predict_batch_fn,
+  return_type=ArrayType(FloatType()),
+  batch_size=100,
+  input_tensor_shapes=[[784]])
+
+df = spark.read.parquet("/path/to/mnist_data")
+df.show(5)
+# ++
+# |data|
+# ++
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# |[0.0, 0.0, 0.0, 0...|
+# ++
+
+df.withColumn("preds", mnist("data")).show(5)
+# ++--

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013958443


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")

Review Comment:
   Q: Shall we support this case ? I think we can regard this case as illegal 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013958443


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")

Review Comment:
   Q: Shall we support this case ? I think we should regard this case as 
illegal case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013960675


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):

Review Comment:
   The checking here force the returned dict to have the same key order with 
the order in struct type field list. I think it is not necessary (note some 
case the dict key order is undefined), we can simply check `set(predNames) 
==set(fieldNames)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013971238


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:

Review Comment:
   We need to add an additional checking: `len(preds.shape)` == 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013972337


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):

Review Comment:
   I suggest we support the case: StructType containing field of ArrayType, 
this is a common case.
   Correspondingly, if we support this, let's add similar checking like 
https://github.com/apache/spark/pull/37734/files#r1013971238



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013973661


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+else:
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfi

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015400166


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")

Review Comment:
   interesting. ok. Do you document it well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015401611


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")

Review Comment:
   @leewyang Pls also add comment saying this case is for supporting some model 
like [Huggingface pipeline for sentiment 
analysis](https://huggingface.co/docs/transformers/quicktour#pipeline-usage)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015403181


##
python/pyspark/ml/functions.py:
##
@@ -162,20 +156,25 @@ def _has_tensor_cols(data: pd.Series | pd.DataFrame | 
Tuple[pd.Series]) -> bool:
 return any(_is_tensor_col(elem) for elem in data)
 
 
-def _validate(
+def _validate_and_transform(

Review Comment:
   Nit: rename as `_validate_and_transform_prediction_result`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015422819


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data.")
+if field.dataType == ArrayType and preds[field.name].shape != 
2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015422322


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data.")
+if field.dataType == ArrayType and preds[field.name].shape != 
2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+

Review Comment:
   Add a line here say `.. versionadded:: 3.4.0`, like other APIs.



-- 
This is an automated mess

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015423592


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data.")
+if field.dataType == ArrayType and preds[field.name].shape != 
2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015427575


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data.")
+if field.dataType == ArrayType and preds[field.name].shape != 
2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1015466019


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data.")
+if field.dataType == ArrayType and preds[field.name].shape != 
2:

Review Comment:
   For the case `field.dataType == ArrayType`, you need to add an additional 
conversion step: `preds[field.name] = list(preds[field.name])`, otherwise 
constructing the result pandas dataframe will cause error.  (Similar to what 
your current code does in L206)
   
   Pls also add a test for the case.



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,543 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+   

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016111000


##
python/pyspark/ml/functions.py:
##
@@ -522,7 +529,11 @@ def predict_columnar(x1: np.ndarray, x2: np.ndarray) -> 
Mapping[str, np.ndarray]
 output), a dictionary of named numpy arrays (for multiple outputs), or 
a row-oriented list
 of dictionaries (for multiple outputs).
 return_type : :class:`pspark.sql.types.DataType` or str.
-Spark SQL datatype for the expected output.
+Spark SQL datatype for the expected output:
+- ArrayType --> 2-dim numpy array.
+- StructType --> dict with keys matching struct fields.
+- StructType --> list of dict with keys matching struct fields, for 
models like the

Review Comment:
   One more comment: pls also add description for the scalar type ( IntegerType 
/ DoubleType etc.) case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016156053


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016156186


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016156583


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016156186


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016161886


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:

Review Comment:
   pls add additional check as well according to 
https://github.com/apache/spark/pull/37734/files#r1016156583



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016162543


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):

Review Comment:
   For other cases (non-ArrayType), pls check that the value shape is 1 
dimension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016163175


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:

Review Comment:
   Pls add additional check that the elements in `preds` must be scaler or be 
an array containing only one scaler element.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apa

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016162543


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):

Review Comment:
   Add check: for other cases (non-ArrayType), pls check that the value shape 
is 1 dimension, or 2-dimension array but the second dim = 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016163175


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:

Review Comment:
   Pls add additional check that `preds` shape is 1 dimension, or 2-dimension 
array but the second dim = 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For add

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-07 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016162543


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):

Review Comment:
   Add check: for other cases (non-ArrayType), pls check that the value shape 
is 1 dimension



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-08 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016311501


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-08 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1016730137


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,556 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+for field in struct_rtype.fields:
+if isinstance(field.dataType, ArrayType):
+if len(preds[field.name].shape) == 2:
+preds[field.name] = list(preds[field.name])
+else:
+raise ValueError(
+"Prediction results for ArrayType must be 
two-dimensional."
+)
+if len(preds[field.name]) != num_input_rows:
+raise ValueError("Prediction results must have same length 
as input data")
+
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if set(predNames) != set(fieldNames):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+
+return pd.DataFrame(preds)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+if len(preds.shape) != 2:
+raise ValueError("Prediction results for ArrayType must be 
two-dimensional.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+
+return pd.Series(list(preds))
+else:  # scalar
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+return pd.Series(np.squeeze(preds))  # type: ignore
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Giv

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-09 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1018671432


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,602 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020309114


##
python/pyspark/ml/model_cache.py:
##
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections import OrderedDict
+from threading import Lock
+from typing import Callable, Optional
+from uuid import UUID
+
+
+class ModelCache:
+"""Cache for model prediction functions on executors."""
+
+_models: OrderedDict[UUID, Callable] = OrderedDict()
+_capacity: int = 8
+_lock: Lock = Lock()
+
+@staticmethod
+def add(uuid: UUID, predict_fn: Callable) -> None:
+ModelCache._lock.acquire()
+ModelCache._models[uuid] = predict_fn
+ModelCache._models.move_to_end(uuid)
+if len(ModelCache._models) > ModelCache._capacity:
+ModelCache._models.popitem(last=False)
+ModelCache._lock.release()

Review Comment:
   Pls use `with lock ...` otherwise when exception raised the lock might be 
locked forever.



##
python/pyspark/ml/model_cache.py:
##
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections import OrderedDict
+from threading import Lock
+from typing import Callable, Optional
+from uuid import UUID
+
+
+class ModelCache:
+"""Cache for model prediction functions on executors."""
+
+_models: OrderedDict[UUID, Callable] = OrderedDict()
+_capacity: int = 8
+_lock: Lock = Lock()
+
+@staticmethod
+def add(uuid: UUID, predict_fn: Callable) -> None:
+ModelCache._lock.acquire()
+ModelCache._models[uuid] = predict_fn
+ModelCache._models.move_to_end(uuid)
+if len(ModelCache._models) > ModelCache._capacity:
+ModelCache._models.popitem(last=False)
+ModelCache._lock.release()
+
+@staticmethod
+def get(uuid: UUID) -> Optional[Callable]:
+ModelCache._lock.acquire()
+predict_fn = ModelCache._models.get(uuid)
+if predict_fn:
+ModelCache._models.move_to_end(uuid)
+ModelCache._lock.release()

Review Comment:
   dittor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020331415


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020332942


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020333239


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020344325


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020344325


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020345415


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,601 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+ 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-11 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1020654939


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +138,602 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+