ryanthompson591 commented on code in PR #23456: URL: https://github.com/apache/beam/pull/23456#discussion_r992348524
########## sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py: ########## @@ -16,12 +16,12 @@ # """ -A pipeline tha uses TFX RunInference API to perform Image classification. +A pipeline that uses TFX RunInference API to perform image classification. Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam. -Note: For the Tensorflow Model, it needs to be updated with a @tf.function - Signature to accept bytes as inputs and should have logic to decode - bytes to data which should be acceptable by the tensorflow model. +Note: A Tensorflow Model needs to be updated with a @tf.function + Signature in order to accept bytes as inputs, and should have logic to decode Review Comment: s/Signature/signature So this example hopes to have some sort of pretrained model and we just update it with a signature? This seems messy, is this the only way to use TF models? ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): + def __init__( + self, + model, + preprocess_input=None, + input_dtype=tf.float32, + feature_description=None): + super().__init__() + self.model = model + self.preprocess_input = preprocess_input + self.input_dtype = input_dtype + self.feature_description = feature_description + if not feature_description: + self.feature_description = {'image': tf.io.FixedLenFeature((), tf.string)} + + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) + def call(self, serialized_examples): + features = tf.io.parse_example( + serialized_examples, features=self.feature_description) + + # using TensorArray as suggested at + # https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602 + batch = len(features['image']) + deserialized_vectors = tf.TensorArray( + self.input_dtype, size=batch, dynamic_size=True) + # issue arise with the indexing at features['image'] + # Vectorized version of tf.io.parse_tensor is not available + # https://github.com/tensorflow/tensorflow/issues/43706 + for i in range(batch): + deserialized_value = tf.io.parse_tensor( + features['image'][i], out_type=self.input_dtype) + + # http://github.com/tensorflow/tensorflow/issues/30409#issuecomment-508962873 + # In Graph mode, return value must get assigned in order to + # update the array + deserialized_vectors = deserialized_vectors.write(i, deserialized_value) + + # deserialized_value = tf.expand_dims(deserialized_vectors, axis=0) Review Comment: do we want to keep this commented code in? ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): + def __init__( + self, + model, + preprocess_input=None, + input_dtype=tf.float32, + feature_description=None): + super().__init__() + self.model = model + self.preprocess_input = preprocess_input + self.input_dtype = input_dtype + self.feature_description = feature_description + if not feature_description: + self.feature_description = {'image': tf.io.FixedLenFeature((), tf.string)} + + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) + def call(self, serialized_examples): + features = tf.io.parse_example( + serialized_examples, features=self.feature_description) + + # using TensorArray as suggested at + # https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602 + batch = len(features['image']) + deserialized_vectors = tf.TensorArray( + self.input_dtype, size=batch, dynamic_size=True) + # issue arise with the indexing at features['image'] Review Comment: This comment was strange. I think all your trying to say is: ``` # Use a for loop to vectorize the tensor. # https://github.com/tensorflow/tensorflow/issues/43706 ``` ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): + def __init__( + self, + model, + preprocess_input=None, + input_dtype=tf.float32, + feature_description=None): + super().__init__() + self.model = model + self.preprocess_input = preprocess_input + self.input_dtype = input_dtype + self.feature_description = feature_description + if not feature_description: + self.feature_description = {'image': tf.io.FixedLenFeature((), tf.string)} + + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) + def call(self, serialized_examples): + features = tf.io.parse_example( + serialized_examples, features=self.feature_description) + + # using TensorArray as suggested at + # https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602 + batch = len(features['image']) + deserialized_vectors = tf.TensorArray( + self.input_dtype, size=batch, dynamic_size=True) + # issue arise with the indexing at features['image'] + # Vectorized version of tf.io.parse_tensor is not available + # https://github.com/tensorflow/tensorflow/issues/43706 + for i in range(batch): + deserialized_value = tf.io.parse_tensor( + features['image'][i], out_type=self.input_dtype) + + # http://github.com/tensorflow/tensorflow/issues/30409#issuecomment-508962873 + # In Graph mode, return value must get assigned in order to + # update the array + deserialized_vectors = deserialized_vectors.write(i, deserialized_value) + + # deserialized_value = tf.expand_dims(deserialized_vectors, axis=0) + deserialized_tensor = deserialized_vectors.stack() + if self.preprocess_input: + deserialized_tensor = self.preprocess_input(deserialized_tensor) + return self.model(deserialized_tensor, training=False) + + +def save_tf_model(path, model=None, preprocess_input=None): + if not model: + model = tf.keras.applications.MobileNetV2(weights='imagenet') + preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input + signature_model = TFModelWrapperWithSignature( + model=model, preprocess_input=preprocess_input) + tf.saved_model.save(signature_model, path) Review Comment: I would add a run method at the bottom of this file that allows you to run this sample code. It's unclear how to use this otherwise. ########## sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py: ########## @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A pipeline that uses TFX RunInference API to perform image classification. +Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam. + +Note: A Tensorflow Model needs to be updated with a @tf.function + Signature in order to accept bytes as inputs, and should have logic to decode + bytes to data that is acceptable by the TensorFlow model. + Please take a look at build_tensorflow_model.py on how to modify + TF Model's signature. +""" + +import argparse +import io +import logging +import os +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Tuple + +import apache_beam as beam +import tensorflow as tf +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import RunInference +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult +from PIL import Image +from tfx_bsl.public.beam.run_inference import CreateModelHandler +from tfx_bsl.public.beam.run_inference import prediction_log_pb2 +from tfx_bsl.public.proto import model_spec_pb2 + +_IMG_SIZE = (224, 224) + + +def filter_empty_lines(text: str) -> Iterator[str]: + if len(text.strip()) > 0: + yield text + + +def read_and_process_image( + image_file_name: str, + path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + # Note: Converts the image dtype from uint8 to float32 + # https://www.tensorflow.org/api_docs/python/tf/image/resize + image = tf.keras.preprocessing.image.img_to_array(data) + image = tf.image.resize(image, _IMG_SIZE) + return image_file_name, image + + +def convert_image_to_example_proto(tensor): + """ + This method performs the following: + 1. Accepts the tensor as input + 2. Serializes the tensor into bytes and pass it through + tf.train.Feature + 3. Pass the serialized tensor feature using tf.train.Example + Proto to the RunInference transform. + + Args: + tensor: A TF tensor. + Returns: + example_proto: A tf.train.Example containing serialized tensor. + """ + serialized_non_scalar = tf.io.serialize_tensor(tensor) + feature_of_bytes = tf.train.Feature( + bytes_list=tf.train.BytesList(value=[serialized_non_scalar.numpy()])) + features_for_example = {'image': feature_of_bytes} + example_proto = tf.train.Example( + features=tf.train.Features(feature=features_for_example)) + return example_proto + + +class ProcessInferenceToString(beam.DoFn): + def process( + self, element: Tuple[str, + prediction_log_pb2.PredictionLog]) -> Iterable[str]: + """ + Args: + element: Tuple of str, and PredictionLog. Inference can be parsed + from prediction_log + returns: + str of filename and inference. + """ + filename, predict_log = element[0], element[1].predict_log + output_value = predict_log.response.outputs + output_tensor = ( + tf.io.decode_raw( + output_value['output_0'].tensor_content, out_type=tf.float32)) + max_index_output_tensor = tf.math.argmax(output_tensor, axis=0) + yield filename + ',' + str(tf.get_static_value(max_index_output_tensor)) + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to the text file containing image names.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path where to save output predictions.' + ' text file.') Review Comment: Fix help text. ########## sdks/python/apache_beam/examples/inference/tfx_bsl/tfx_bsl_inference_it_test.py: ########## @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest +import uuid + +import pytest + +from apache_beam.io.filesystems import FileSystems +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=ungrouped-imports +try: + import tfx_bsl + from apache_beam.examples.inference.tfx_bsl import tensorflow_image_classification +except ImportError as e: + tfx_bsl = None +# pylint: disable=line-too-long +_EXPECTED_OUTPUTS = { + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005001.JPEG': '681', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005002.JPEG': '333', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005003.JPEG': '711', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005004.JPEG': '286', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005005.JPEG': '445', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005006.JPEG': '288', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005007.JPEG': '880', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005008.JPEG': '534', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005009.JPEG': '888', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005010.JPEG': '996', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005011.JPEG': '327', + 'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005012.JPEG': '573' +} + + +def process_outputs(filepath): Review Comment: This might not be part of this PR but are we doing this similar thing elsewhere? It might be worth thinking about making a utility method or inherited class at some point. ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): Review Comment: Can you comment that into the code somehow, since it wouldn't be apparent to me if I was just looking at this code what it's purpose is. I mean this whole module is not used anywhere else as far as I can tell. If I understand correctly it would be something like: ``` # This demonstrates how to build and save the model used in tensorflow_image_classification.py. # To prepare a model you will need: # 1. Instruction 1 #. 2. Instruction 2 .... # A saved model should be placed in GCS and then can be referenced in ... ``` TFModelWrapperWithSignature( ... ########## sdks/python/apache_beam/examples/inference/tfx_bsl/tensorflow_image_classification.py: ########## @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A pipeline that uses TFX RunInference API to perform image classification. +Please look at https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam. + +Note: A Tensorflow Model needs to be updated with a @tf.function + Signature in order to accept bytes as inputs, and should have logic to decode + bytes to data that is acceptable by the TensorFlow model. + Please take a look at build_tensorflow_model.py on how to modify + TF Model's signature. +""" + +import argparse +import io +import logging +import os +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Tuple + +import apache_beam as beam +import tensorflow as tf +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import RunInference +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult +from PIL import Image +from tfx_bsl.public.beam.run_inference import CreateModelHandler +from tfx_bsl.public.beam.run_inference import prediction_log_pb2 +from tfx_bsl.public.proto import model_spec_pb2 + +_IMG_SIZE = (224, 224) + + +def filter_empty_lines(text: str) -> Iterator[str]: + if len(text.strip()) > 0: + yield text + + +def read_and_process_image( + image_file_name: str, + path_to_dir: Optional[str] = None) -> Tuple[str, tf.Tensor]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + # Note: Converts the image dtype from uint8 to float32 + # https://www.tensorflow.org/api_docs/python/tf/image/resize + image = tf.keras.preprocessing.image.img_to_array(data) + image = tf.image.resize(image, _IMG_SIZE) + return image_file_name, image + + +def convert_image_to_example_proto(tensor): + """ + This method performs the following: + 1. Accepts the tensor as input + 2. Serializes the tensor into bytes and pass it through + tf.train.Feature + 3. Pass the serialized tensor feature using tf.train.Example + Proto to the RunInference transform. + + Args: + tensor: A TF tensor. + Returns: + example_proto: A tf.train.Example containing serialized tensor. + """ + serialized_non_scalar = tf.io.serialize_tensor(tensor) + feature_of_bytes = tf.train.Feature( + bytes_list=tf.train.BytesList(value=[serialized_non_scalar.numpy()])) + features_for_example = {'image': feature_of_bytes} + example_proto = tf.train.Example( + features=tf.train.Features(feature=features_for_example)) + return example_proto + + +class ProcessInferenceToString(beam.DoFn): + def process( + self, element: Tuple[str, + prediction_log_pb2.PredictionLog]) -> Iterable[str]: + """ + Args: + element: Tuple of str, and PredictionLog. Inference can be parsed + from prediction_log + returns: + str of filename and inference. + """ + filename, predict_log = element[0], element[1].predict_log + output_value = predict_log.response.outputs + output_tensor = ( + tf.io.decode_raw( + output_value['output_0'].tensor_content, out_type=tf.float32)) + max_index_output_tensor = tf.math.argmax(output_tensor, axis=0) + yield filename + ',' + str(tf.get_static_value(max_index_output_tensor)) + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to the text file containing image names.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path where to save output predictions.' + ' text file.') + parser.add_argument( + '--model_path', + dest='model_path', + required=True, + help="Path to the model.") + parser.add_argument( + '--images_dir', + default=None, + help='Path to the directory where images are stored.' + 'Not required if image names in the input file have absolute path.') + return parser.parse_known_args(argv) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + """ + Args: + argv: Command line arguments defined for this example. + save_main_session: Used for internal testing. + test_pipeline: Used for internal testing. + """ + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + saved_model_spec = model_spec_pb2.SavedModelSpec( + model_path=known_args.model_path) + inferece_spec_type = model_spec_pb2.InferenceSpecType( + saved_model_spec=saved_model_spec) + model_handler = CreateModelHandler(inferece_spec_type) + # create a KeyedModelHandler to accommodate image names as keys. + keyed_model_handler = KeyedModelHandler(model_handler) + + pipeline = test_pipeline + if not test_pipeline: + pipeline = beam.Pipeline(options=pipeline_options) + + filename_value_pair = ( + pipeline + | 'ReadImageNames' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines) Review Comment: is it possible to have cleaned input as our example input, instead of needing to clean it in our sample code. ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): + def __init__( + self, + model, + preprocess_input=None, + input_dtype=tf.float32, + feature_description=None): + super().__init__() + self.model = model + self.preprocess_input = preprocess_input + self.input_dtype = input_dtype + self.feature_description = feature_description + if not feature_description: + self.feature_description = {'image': tf.io.FixedLenFeature((), tf.string)} + + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) + def call(self, serialized_examples): + features = tf.io.parse_example( + serialized_examples, features=self.feature_description) + + # using TensorArray as suggested at + # https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602 + batch = len(features['image']) + deserialized_vectors = tf.TensorArray( + self.input_dtype, size=batch, dynamic_size=True) + # issue arise with the indexing at features['image'] + # Vectorized version of tf.io.parse_tensor is not available + # https://github.com/tensorflow/tensorflow/issues/43706 + for i in range(batch): + deserialized_value = tf.io.parse_tensor( + features['image'][i], out_type=self.input_dtype) + + # http://github.com/tensorflow/tensorflow/issues/30409#issuecomment-508962873 + # In Graph mode, return value must get assigned in order to + # update the array + deserialized_vectors = deserialized_vectors.write(i, deserialized_value) + + # deserialized_value = tf.expand_dims(deserialized_vectors, axis=0) + deserialized_tensor = deserialized_vectors.stack() + if self.preprocess_input: + deserialized_tensor = self.preprocess_input(deserialized_tensor) + return self.model(deserialized_tensor, training=False) + + +def save_tf_model(path, model=None, preprocess_input=None): + if not model: Review Comment: Is the purpose of passing in a model to continue training on a partially trained model? ########## sdks/python/apache_beam/examples/inference/tfx_bsl/build_tensorflow_model.py: ########## @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tensorflow as tf + + +class TFModelWrapperWithSignature(tf.keras.Model): + def __init__( + self, + model, + preprocess_input=None, + input_dtype=tf.float32, + feature_description=None): + super().__init__() + self.model = model + self.preprocess_input = preprocess_input + self.input_dtype = input_dtype + self.feature_description = feature_description + if not feature_description: + self.feature_description = {'image': tf.io.FixedLenFeature((), tf.string)} + + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) + def call(self, serialized_examples): + features = tf.io.parse_example( + serialized_examples, features=self.feature_description) + + # using TensorArray as suggested at Review Comment: this comment just seems out of place. Maybe make it more specific. For example (and I don't know if this is right or not): ``` # A tensor Array is a set of all our input data. It is structured in this fashion: #. a width x height x color matrix of float32 values representing the color value for each pixel. # See https://github.com/tensorflow/tensorflow/issues/39323#issuecomment-627586602 # for more discussion on this setup. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
