AnandInguva commented on code in PR #23456: URL: https://github.com/apache/beam/pull/23456#discussion_r1001018365
########## sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py: ########## @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A pipeline tha uses TFX RunInference API to perform Image classification. +Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam. + +Note: For the Tensorflow Model, it needs to be updated with a @tf.function + Signature to accept bytes as inputs and should have logic to decode + bytes to data which should be acceptable by the tensorflow model. + Please take a look at build_tensorflow_model.py on how to modify + TF Model's signature. +""" + +import argparse +import io +import logging +import os +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Tuple + +import apache_beam as beam +import tensorflow as tf +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import RunInference +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult +from PIL import Image +from tfx_bsl.public.beam.run_inference import CreateModelHandler +from tfx_bsl.public.beam.run_inference import prediction_log_pb2 +from tfx_bsl.public.proto import model_spec_pb2 + +_IMG_SIZE = (224, 224) + + +def filter_empty_lines(text: str) -> Iterator[str]: + if len(text.strip()) > 0: + yield text + + +def read_and_process_image( + image_file_name: str, + path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + # Note: Converts the image dtype from uint8 to float32 + # https://www.tensorflow.org/api_docs/python/tf/image/resize + image = tf.keras.preprocessing.image.img_to_array(data) + image = tf.image.resize(image, _IMG_SIZE) + return image_file_name, image + + +def convert_image_to_example_proto(tensor): Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
