[
https://issues.apache.org/jira/browse/BEAM-14535?focusedWorklogId=777167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777167
]
ASF GitHub Bot logged work on BEAM-14535:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Jun/22 21:05
Start Date: 01/Jun/22 21:05
Worklog Time Spent: 10m
Work Description: yeandy commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r887267918
##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
model_uri=file.name, model_file_type=None)
model_loader.load_model()
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_pandas(self):
+ temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+ with open(temp_file_name, 'wb') as file:
+ pickle.dump(build_pandas_pipeline(), file)
+ with TestPipeline() as pipeline:
+ data_frame = pandas_dataframe()
Review Comment:
nit
```suggestion
dataframe = pandas_dataframe()
```
##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
model_uri=file.name, model_file_type=None)
model_loader.load_model()
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_pandas(self):
+ temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+ with open(temp_file_name, 'wb') as file:
+ pickle.dump(build_pandas_pipeline(), file)
+ with TestPipeline() as pipeline:
+ data_frame = pandas_dataframe()
+
+ pcoll = pipeline | 'start' >> beam.Create([data_frame])
+ actual = pcoll | api.RunInference(
+ SklearnModelLoader(model_uri=temp_file_name))
+
+ splits = [data_frame.loc[[i]] for i in data_frame.index]
+ expected = [
+ api.PredictionResult(splits[0], 5),
+ api.PredictionResult(splits[1], 8),
+ api.PredictionResult(splits[2], 1),
+ api.PredictionResult(splits[3], 1),
+ api.PredictionResult(splits[4], 2),
+ ]
+ assert_that(
+ actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
+
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_pandas_with_keys(self):
Review Comment:
We're already have a keyed test in `base_test.py`. Is it necessary to test a
keyed example for each framework?
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
- def run_inference(self, batch: List[numpy.ndarray],
- model: BaseEstimator) -> Iterable[PredictionResult]:
+ def run_inference(
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
Review Comment:
I don't think there's a convention pertaining the order of elements in a
Union? Since `pandas.DataFrame` is used more, would this be more useful?
```suggestion
batch: List[Union[pandas.DataFrame, numpy.ndarray]],
```
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
- def run_inference(self, batch: List[numpy.ndarray],
- model: BaseEstimator) -> Iterable[PredictionResult]:
+ def run_inference(
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+ model: BaseEstimator) -> Iterable[PredictionResult]:
+ if isinstance(batch[0], numpy.ndarray):
+ return SklearnInferenceRunner._predict_np_array(batch, model)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+ @staticmethod
+ def _predict_np_array(batch: List[numpy.ndarray],
+ model: Any) -> Iterable[PredictionResult]:
# vectorize data for better performance
vectorized_batch = numpy.stack(batch, axis=0)
predictions = model.predict(vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
- def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+ @staticmethod
+ def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+ model: Any) -> Iterable[PredictionResult]:
+ # vectorize data for better performance
+ vectorized_batch = pandas.concat(batch, axis=0)
+ predictions = model.predict(vectorized_batch)
+ splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
Review Comment:
I did a toy example like so:
```
>>> df1 = pd.DataFrame([[1,2,3],[4,5,6]])
>>> df2 = pd.DataFrame([[11,12,13],[14,15,16]])
>>> vectorized_batch = pd.concat([df1, df2])
>>> vectorized_batch.index
Int64Index([0, 1, 0, 1], dtype='int64')
>>> vectorized_batch.loc[[0]]
0 1 2
0 1 2 3
0 11 12 13
>>> vectorized_batch.loc[[0]]
0 1 2
1 4 5 6
1 14 15 16
>>> splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
>>> splits
[ 0 1 2
0 1 2 3
0 11 12 13, 0 1 2
1 4 5 6
1 14 15 16, 0 1 2
0 1 2 3
0 11 12 13, 0 1 2
1 4 5 6
1 14 15 16]
```
I see that index 0 is found in both the first and second batch. Same for
index 1. So indexing by `vectorized_batch.index` appears to up data from the
different DataFrames.
Can you clarify how `vectorized_batch.index` is supposed to differentiate
between the batches?
##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -62,6 +83,44 @@ def build_model():
return model
+def pandas_dataframe():
+ csv_string = (
+ 'category_1,number_1,category_2,number_2,label,number_3\n'
Review Comment:
Does this implicitly add indexes starting from 0? What if the user has
custom indexes?
##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
model_uri=file.name, model_file_type=None)
model_loader.load_model()
+ @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+ def test_pipeline_pandas(self):
+ temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+ with open(temp_file_name, 'wb') as file:
+ pickle.dump(build_pandas_pipeline(), file)
+ with TestPipeline() as pipeline:
+ data_frame = pandas_dataframe()
+
+ pcoll = pipeline | 'start' >> beam.Create([data_frame])
Review Comment:
So here, we have a list of 1 DataFrame. Can we test a list of 2 or more
DataFrames which would simulate reading in separate text files into their own
DFs?
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
- def run_inference(self, batch: List[numpy.ndarray],
- model: BaseEstimator) -> Iterable[PredictionResult]:
+ def run_inference(
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+ model: BaseEstimator) -> Iterable[PredictionResult]:
+ if isinstance(batch[0], numpy.ndarray):
+ return SklearnInferenceRunner._predict_np_array(batch, model)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
Review Comment:
Nice refactor
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
- def run_inference(self, batch: List[numpy.ndarray],
- model: BaseEstimator) -> Iterable[PredictionResult]:
+ def run_inference(
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+ model: BaseEstimator) -> Iterable[PredictionResult]:
+ if isinstance(batch[0], numpy.ndarray):
+ return SklearnInferenceRunner._predict_np_array(batch, model)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+ @staticmethod
+ def _predict_np_array(batch: List[numpy.ndarray],
+ model: Any) -> Iterable[PredictionResult]:
# vectorize data for better performance
vectorized_batch = numpy.stack(batch, axis=0)
predictions = model.predict(vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
- def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+ @staticmethod
+ def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+ model: Any) -> Iterable[PredictionResult]:
+ # vectorize data for better performance
+ vectorized_batch = pandas.concat(batch, axis=0)
+ predictions = model.predict(vectorized_batch)
+ splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
+ return [
+ PredictionResult(example, inference) for example,
+ inference in zip(splits, predictions)
+ ]
+
+ def get_num_bytes(
+ self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
"""Returns the number of bytes of data for a batch."""
- return sum(sys.getsizeof(element) for element in batch)
+ if isinstance(batch[0], numpy.ndarray):
+ return sum(sys.getsizeof(element) for element in batch)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return sum(df.memory_usage(deep=True).sum() for df in batch)
Review Comment:
What does `deep=True` mean?
Issue Time Tracking
-------------------
Worklog Id: (was: 777167)
Time Spent: 1h 20m (was: 1h 10m)
> Add support for Pandas Dataframes to sklearn RunInference Implementation
> ------------------------------------------------------------------------
>
> Key: BEAM-14535
> URL: https://issues.apache.org/jira/browse/BEAM-14535
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Ryan Thompson
> Assignee: Ryan Thompson
> Priority: P2
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Sklearn pipelines are often set up to take pandas dataframes.
>
> Our current implementation only supports numpy arrays.
>
> This FR allows the sklearn implementation to autodetect pandas dataframes or
> numpy arrays and then combine them (via concat).
>
> In the case of a pandas dataframe that value will be passed through to the
> pipeline.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)