yeandy commented on code in PR #23456:
URL: https://github.com/apache/beam/pull/23456#discussion_r990242890


##########
sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline tha uses TFX RunInference API to perform Image classification.
+Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.
+
+Note: For the Tensorflow Model, it needs to be updated with a @tf.function
+      Signature to accept bytes as inputs and should have logic to decode
+      bytes to data which should be acceptable by the tensorflow model.
+      Please take a look at build_tensorflow_model.py on how to modify
+      TF Model's signature.
+"""
+
+import argparse
+import io
+import logging
+import os
+from typing import Iterable
+from typing import Iterator
+from typing import Optional
+from typing import Tuple
+
+import apache_beam as beam
+import tensorflow as tf
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from PIL import Image
+from tfx_bsl.public.beam.run_inference import CreateModelHandler
+from tfx_bsl.public.beam.run_inference import prediction_log_pb2
+from tfx_bsl.public.proto import model_spec_pb2
+
+_IMG_SIZE = (224, 224)
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+def read_and_process_image(
+    image_file_name: str,
+    path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+  # Note: Converts the image dtype from uint8 to float32
+  # https://www.tensorflow.org/api_docs/python/tf/image/resize
+  image = tf.keras.preprocessing.image.img_to_array(data)
+  image = tf.image.resize(image, _IMG_SIZE)
+  return image_file_name, image
+
+
+def convert_image_to_example_proto(tensor):
+  """
+  This method performs the following:
+  1. Accepts the tensor as input
+  2. Serializes the tensor into bytes and pass it through
+        tf.train.Feature
+  3. Pass the serialized tensor feature using tf.train.Example
+      Proto to the RunInference transform.
+
+  Args:
+    tensor: A TF tensor.
+  Returns:
+    example_proto: A tf.train.Example containing serialized tensor.
+  """
+  serialized_non_scalar = tf.io.serialize_tensor(tensor)
+  feature_of_bytes = tf.train.Feature(
+      bytes_list=tf.train.BytesList(value=[serialized_non_scalar.numpy()]))
+  features_for_example = {'image': feature_of_bytes}
+  example_proto = tf.train.Example(
+      features=tf.train.Features(feature=features_for_example))
+  return example_proto
+
+
+class ProcessInferenceToString(beam.DoFn):
+  def process(
+      self, element: Tuple[str,
+                           prediction_log_pb2.PredictionLog]) -> Iterable[str]:
+    """
+    Args:
+      element: Tuple of str, and PredictionLog. Inference can be parsed
+        from prediction_log
+    returns:
+      str of filename and inference.
+    """
+    filename, predict_log = element[0], element[1].predict_log
+    output_value = predict_log.response.outputs
+    output_tensor = (
+        tf.io.decode_raw(
+            output_value['output_0'].tensor_content, out_type=tf.float32))
+    max_index_output_tensor = tf.math.argmax(output_tensor, axis=0)
+    yield filename + ',' + str(tf.get_static_value(max_index_output_tensor))
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_path',
+      dest='model_path',
+      required=True,
+      help="Path to the model.")
+  parser.add_argument(
+      '--images_dir',
+      default=None,
+      help='Path to the directory where images are stored.'
+      'Not required if image names in the input file have absolute path.')
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  saved_model_spec = model_spec_pb2.SavedModelSpec(
+      model_path=known_args.model_path)
+  inferece_spec_type = model_spec_pb2.InferenceSpecType(
+      saved_model_spec=saved_model_spec)
+  model_handler = CreateModelHandler(inferece_spec_type)
+  # create a keyedModelHandler to accommodate image names as keys.
+  keyed_model_handler = KeyedModelHandler(model_handler)
+
+  pipeline = test_pipeline
+  if not test_pipeline:
+    pipeline = beam.Pipeline(options=pipeline_options)

Review Comment:
   Can't we just rename the input param from `test_pipeline` to `pipeline` 
(line 144) so we don't have to set `pipeline = test_pipeline`?



##########
sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py:
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tensorflow as tf
+
+
+class TFModelWrapperWithSignature(tf.keras.Model):
+  def __init__(
+      self,
+      model,
+      preprocess_input=None,
+      input_dtype=tf.float32,
+      feature_description=None):
+    super().__init__()
+    self.model = model
+    self.preprocess_input = preprocess_input
+    self.input_dtype = input_dtype
+    self.feature_description = feature_description
+    if not feature_description:
+      self.feature_description = {'image': tf.io.FixedLenFeature((), 
tf.string)}
+
+  @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)])
+  def call(self, serialized_examples):
+    features = tf.io.parse_example(
+        serialized_examples, features=self.feature_description)
+
+    # using TensorArray as suggested at
+    # 
https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602
+    batch = len(features['image'])
+    deserialized_vectors = tf.TensorArray(
+        self.input_dtype, size=batch, dynamic_size=True)
+    # issue arise with the indexing at features['image']
+    # Vectorized version of tf.io.parse_tensor is not available
+    # https://github.com/tensorflow/tensorflow/issues/43706
+    for i in range(batch):
+      deserialized_value = tf.io.parse_tensor(
+          features['image'][i], out_type=self.input_dtype)
+
+      # 
http://github.com/tensorflow/tensorflow/issues/30409#issuecomment-508962873
+      # In Graph mode, return value must get assigned in order to
+      # update the array
+      deserialized_vectors = deserialized_vectors.write(i, deserialized_value)
+
+    # deserialized_value = tf.expand_dims(deserialized_vectors, axis=0)
+    deserialized_tensor = deserialized_vectors.stack()
+    if self.preprocess_input:
+      deserialized_tensor = self.preprocess_input(deserialized_tensor)
+    return self.model(deserialized_tensor, training=False)
+
+
+def save_tf_model(path, model=None, preprocess_input=None):

Review Comment:
   This doesn't actually get run anywhere, right? It's looks to be just 
reference for a user. Are there any instructions on how to use `save_tf_model`?



##########
sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline tha uses TFX RunInference API to perform Image classification.
+Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.
+
+Note: For the Tensorflow Model, it needs to be updated with a @tf.function
+      Signature to accept bytes as inputs and should have logic to decode
+      bytes to data which should be acceptable by the tensorflow model.
+      Please take a look at build_tensorflow_model.py on how to modify
+      TF Model's signature.
+"""
+
+import argparse
+import io
+import logging
+import os
+from typing import Iterable
+from typing import Iterator
+from typing import Optional
+from typing import Tuple
+
+import apache_beam as beam
+import tensorflow as tf
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from PIL import Image
+from tfx_bsl.public.beam.run_inference import CreateModelHandler
+from tfx_bsl.public.beam.run_inference import prediction_log_pb2
+from tfx_bsl.public.proto import model_spec_pb2
+
+_IMG_SIZE = (224, 224)
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+def read_and_process_image(
+    image_file_name: str,
+    path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+  # Note: Converts the image dtype from uint8 to float32
+  # https://www.tensorflow.org/api_docs/python/tf/image/resize
+  image = tf.keras.preprocessing.image.img_to_array(data)
+  image = tf.image.resize(image, _IMG_SIZE)
+  return image_file_name, image
+
+
+def convert_image_to_example_proto(tensor):
+  """
+  This method performs the following:
+  1. Accepts the tensor as input
+  2. Serializes the tensor into bytes and pass it through
+        tf.train.Feature
+  3. Pass the serialized tensor feature using tf.train.Example
+      Proto to the RunInference transform.
+
+  Args:
+    tensor: A TF tensor.
+  Returns:
+    example_proto: A tf.train.Example containing serialized tensor.
+  """
+  serialized_non_scalar = tf.io.serialize_tensor(tensor)
+  feature_of_bytes = tf.train.Feature(
+      bytes_list=tf.train.BytesList(value=[serialized_non_scalar.numpy()]))
+  features_for_example = {'image': feature_of_bytes}
+  example_proto = tf.train.Example(
+      features=tf.train.Features(feature=features_for_example))
+  return example_proto
+
+
+class ProcessInferenceToString(beam.DoFn):
+  def process(
+      self, element: Tuple[str,
+                           prediction_log_pb2.PredictionLog]) -> Iterable[str]:
+    """
+    Args:
+      element: Tuple of str, and PredictionLog. Inference can be parsed
+        from prediction_log
+    returns:
+      str of filename and inference.
+    """
+    filename, predict_log = element[0], element[1].predict_log
+    output_value = predict_log.response.outputs
+    output_tensor = (
+        tf.io.decode_raw(
+            output_value['output_0'].tensor_content, out_type=tf.float32))
+    max_index_output_tensor = tf.math.argmax(output_tensor, axis=0)
+    yield filename + ',' + str(tf.get_static_value(max_index_output_tensor))
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_path',
+      dest='model_path',
+      required=True,
+      help="Path to the model.")
+  parser.add_argument(
+      '--images_dir',
+      default=None,
+      help='Path to the directory where images are stored.'
+      'Not required if image names in the input file have absolute path.')
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    save_main_session: Used for internal testing.
+    test_pipeline: Used for internal testing.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  saved_model_spec = model_spec_pb2.SavedModelSpec(
+      model_path=known_args.model_path)
+  inferece_spec_type = model_spec_pb2.InferenceSpecType(
+      saved_model_spec=saved_model_spec)
+  model_handler = CreateModelHandler(inferece_spec_type)
+  # create a keyedModelHandler to accommodate image names as keys.

Review Comment:
   nit
   ```suggestion
     # create a KeyedModelHandler to accommodate image names as keys.
   ```



##########
sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline tha uses TFX RunInference API to perform Image classification.
+Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.

Review Comment:
   ```suggestion
   A pipeline that uses TFX RunInference API to perform image classification.
   Please look at 
https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.
   ```



##########
sdks/python/test-suites/direct/common.gradle:
##########
@@ -233,10 +234,38 @@ task sklearnInferenceTest {
     }
 }
 
+// Scikit-learn RunInference IT tests

Review Comment:
   ```suggestion
   // TensorFlow Extended (TFX) RunInference IT tests
   ```



##########
sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline tha uses TFX RunInference API to perform Image classification.
+Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.
+
+Note: For the Tensorflow Model, it needs to be updated with a @tf.function
+      Signature to accept bytes as inputs and should have logic to decode
+      bytes to data which should be acceptable by the tensorflow model.
+      Please take a look at build_tensorflow_model.py on how to modify
+      TF Model's signature.
+"""
+
+import argparse
+import io
+import logging
+import os
+from typing import Iterable
+from typing import Iterator
+from typing import Optional
+from typing import Tuple
+
+import apache_beam as beam
+import tensorflow as tf
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from PIL import Image
+from tfx_bsl.public.beam.run_inference import CreateModelHandler
+from tfx_bsl.public.beam.run_inference import prediction_log_pb2
+from tfx_bsl.public.proto import model_spec_pb2
+
+_IMG_SIZE = (224, 224)
+
+
+def filter_empty_lines(text: str) -> Iterator[str]:
+  if len(text.strip()) > 0:
+    yield text
+
+
+def read_and_process_image(
+    image_file_name: str,
+    path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+  # Note: Converts the image dtype from uint8 to float32
+  # https://www.tensorflow.org/api_docs/python/tf/image/resize
+  image = tf.keras.preprocessing.image.img_to_array(data)
+  image = tf.image.resize(image, _IMG_SIZE)
+  return image_file_name, image
+
+
+def convert_image_to_example_proto(tensor):

Review Comment:
   Are we able to add input and return types here?



##########
sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline tha uses TFX RunInference API to perform Image classification.
+Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam.
+
+Note: For the Tensorflow Model, it needs to be updated with a @tf.function
+      Signature to accept bytes as inputs and should have logic to decode
+      bytes to data which should be acceptable by the tensorflow model.
+      Please take a look at build_tensorflow_model.py on how to modify
+      TF Model's signature.

Review Comment:
   ```suggestion
   Note: A Tensorflow Model needs to be updated with a @tf.function
         Signature in order to accept bytes as inputs, and should have logic to 
decode
         bytes to data that is acceptable by the TensorFlow model.
         Please take a look at build_tensorflow_model.py on how to modify
         TF Model's signature.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to