damccorm commented on code in PR #24382: URL: https://github.com/apache/beam/pull/24382#discussion_r1061686391
########## .test-infra/jenkins/job_BenchmarkTests_CloudML_Python.groovy: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PhraseTriggeringPostCommitBuilder +import CronJobBuilder + +def cloudMLJob = { scope -> + scope.description('Runs the TFT Criteo Examples on the Dataflow runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(scope) + + // Gradle goals for this job. + scope.steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + commonJobProperties.setGradleSwitches(delegate, 'master', 360) + tasks(':sdks:python:test-suites:dataflow:tftTests') + } + } +} + +CronJobBuilder.cronJob( + 'beam_PostCommit_Python_CloudML_tests', Review Comment: Again, we should probably follow convention ```suggestion 'beam_Inference_Python_Benchmarks_Dataflow', ``` ########## sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_constants_lib.py: ########## @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A common file for CloudML benchmarks. + +This file contains constants for pipeline paths, dependency locations and +test data paths. +""" + +INPUT_CRITEO_SMALL = 'train10.tsv' +INPUT_CRITEO_SMALL_100MB = '100mb/train.txt' +INPUT_CRITEO_10GB = '10gb/train.txt' + +# The model is trained by running the criteo preprocessing and training. +# The input dataset was the Criteo 10GB dataset and frequency_threshold=100 was +# set for categorical features. +# MODEL_CRITEO_10GB = 'testdata/criteo/saved_model' Review Comment: Do we need this comment block? Same for next one ########## sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py: ########## @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Schema and tranform definition for the Criteo dataset.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import tensorflow_transform as tft + + +def _get_raw_categorical_column_name(column_idx): + return 'categorical-feature-{}'.format(column_idx) + + +def get_transformed_categorical_column_name(column_name_or_id): + if isinstance(column_name_or_id, bytes): + # assume the input is column name + column_name = column_name_or_id + else: + # assume the input is column id + column_name = _get_raw_categorical_column_name(column_name_or_id) + return column_name + '_id' + + +_INTEGER_COLUMN_NAMES = [ + 'int-feature-{}'.format(column_idx) for column_idx in range(1, 14) +] +_CATEGORICAL_COLUMN_NAMES = [ + _get_raw_categorical_column_name(column_idx) + for column_idx in range(14, 40) +] +DEFAULT_DELIMITER = '\t' +# Number of buckets for integer columns. +_NUM_BUCKETS = 10 + +# Schema annotations aren't supported in this build. +tft.common.IS_ANNOTATIONS_PB_AVAILABLE = False + + +def make_ordered_column_names(include_label=True): + """Returns the column names in the dataset in the order as they appear. + + Args: + include_label: Indicates whether the label feature should be included. + Returns: + A list of column names in the dataset. + """ + result = ['clicked'] if include_label else [] + for name in _INTEGER_COLUMN_NAMES: + result.append(name) + for name in _CATEGORICAL_COLUMN_NAMES: + result.append(name) + return result + + +# TODO(b/115566921): Remove this once models have been updated for Lantern, TFMA +# and BatchPrediction. +def make_legacy_input_feature_spec(include_label=True): + """Input schema definition. + + Args: + include_label: Indicates whether the label feature should be included. + Returns: + A `Schema` object. + """ + result = {} + if include_label: + result['clicked'] = tf.io.FixedLenFeature(shape=[], dtype=tf.int64) + for name in _INTEGER_COLUMN_NAMES: + result[name] = tf.io.FixedLenFeature( + shape=[], dtype=tf.int64, default_value=-1) + for name in _CATEGORICAL_COLUMN_NAMES: + result[name] = tf.io.FixedLenFeature( + shape=[], dtype=tf.string, default_value='') + return result + + +def make_input_feature_spec(include_label=True): + """Input schema definition. + + Args: + include_label: Indicates whether the label feature should be included. + + Returns: + A `Schema` object. + """ + result = {} + if include_label: + result['clicked'] = tf.io.FixedLenFeature(shape=[], dtype=tf.int64) + for name in _INTEGER_COLUMN_NAMES: + result[name] = tf.io.VarLenFeature(dtype=tf.int64) + + for name in _CATEGORICAL_COLUMN_NAMES: + result[name] = tf.io.VarLenFeature(dtype=tf.string) + + return result + + +def make_preprocessing_fn(frequency_threshold): + """Creates a preprocessing function for criteo. + + Args: + frequency_threshold: The frequency_threshold used when generating + vocabularies for the categorical features. + + Returns: + A preprocessing function. + """ + def preprocessing_fn(inputs): + """User defined preprocessing function for criteo columns. + + Args: + inputs: dictionary of input `tensorflow_transform.Column`. + Returns: + A dictionary of `tensorflow_transform.Column` representing the transformed + columns. + """ + # TODO(b/35001605) Make this "passthrough" more DRY. Review Comment: Please remove these b/ references and create issues as appropriate ########## sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_test.py: ########## @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import unittest +import uuid + +import pytest + +try: + import apache_beam.testing.benchmarks.cloudml.cloudml_benchmark_constants_lib as lib + from apache_beam.testing.benchmarks.cloudml.pipelines import workflow + from apache_beam.testing.test_pipeline import TestPipeline +except ImportError: # pylint: disable=bare-except + raise unittest.SkipTest('Dependencies are not installed') + +_INPUT_GCS_BUCKET_ROOT = 'gs://apache-beam-ml/datasets/cloudml/criteo' +_CRITEO_FEATURES_FILE = 'testdata/criteo/expected/features.tfrecord.gz' +_OUTPUT_GCS_BUKCET_ROOT = 'gs://temp-storage-for-end-to-end-tests/tft/' Review Comment: Nit: bucket is misspelled ########## sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py: ########## @@ -0,0 +1,223 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import logging +import os + +import apache_beam as beam +import tensorflow_transform as tft +import tensorflow_transform.beam as tft_beam +from apache_beam.testing.benchmarks.cloudml.criteo_tft import criteo +from tensorflow_transform import coders +from tensorflow_transform.tf_metadata import dataset_metadata +from tensorflow_transform.tf_metadata import schema_utils +from tfx_bsl.public import tfxio + +# Name of the column for the synthetic version of the benchmark. +_SYNTHETIC_COLUMN = 'x' + + +class _RecordBatchToPyDict(beam.PTransform): + """Converts PCollections of pa.RecordBatch to python dicts.""" + def __init__(self, input_feature_spec): + self._input_feature_spec = input_feature_spec + + def expand(self, pcoll): + def format_values(instance): + return { + k: v.squeeze(0).tolist() + if v is not None else self._input_feature_spec[k].default_value + for k, + v in instance.items() + } + + return ( + pcoll + | 'RecordBatchToDicts' >> + beam.FlatMap(lambda x: x.to_pandas().to_dict(orient='records')) + | 'FormatPyDictValues' >> beam.Map(format_values)) + + +def _synthetic_preprocessing_fn(inputs): + return { + _SYNTHETIC_COLUMN: tft.compute_and_apply_vocabulary( + inputs[_SYNTHETIC_COLUMN], + + # Execute more codepaths but do no frequency filtration. + frequency_threshold=1, + + # Execute more codepaths but do no top filtration. + top_k=2**31 - 1, + + # Execute more codepaths + num_oov_buckets=10) + } + + +class _PredictionHistogramFn(beam.DoFn): + def __init__(self): + # Beam Metrics API for Distributions only works with integers but + # predictions are floating point numbers. We thus store a "quantized" + # distribution of the prediction with sufficient granularity and for ease + # of human interpretation (eg as a percentage for logistic regression). + self._prediction_distribution = beam.metrics.Metrics.distribution( + self.__class__, 'int(scores[0]*100)') + + def process(self, element): + self._prediction_distribution.update(int(element['scores'][0] * 100)) + + +def setup_pipeline(p, args): + if args.classifier == 'criteo': + input_feature_spec = criteo.make_input_feature_spec() + input_schema = schema_utils.schema_from_feature_spec(input_feature_spec) + input_tfxio = tfxio.BeamRecordCsvTFXIO( + physical_format='text', + column_names=criteo.make_ordered_column_names(), + schema=input_schema, + delimiter=criteo.DEFAULT_DELIMITER, + telemetry_descriptors=['CriteoCloudMLBenchmark']) + preprocessing_fn = criteo.make_preprocessing_fn(args.frequency_threshold) + else: + assert False, 'Unknown args classifier <{}>'.format(args.classifier) + + input_data = p | 'ReadFromText' >> beam.io.textio.ReadFromText( + args.input, coder=beam.coders.BytesCoder()) + + if args.benchmark_type == 'tft': + logging.info('TFT benchmark') + + # Setting TFXIO output format only for Criteo benchmarks to make sure that + # both codepaths are covered. + output_record_batches = args.classifier == 'criteo' + + # pylint: disable=expression-not-assigned + input_metadata = dataset_metadata.DatasetMetadata(schema=input_schema) + ( + input_metadata + | 'WriteInputMetadata' >> tft_beam.WriteMetadata( + os.path.join(args.output, 'raw_metadata'), pipeline=p)) + + with tft_beam.Context(temp_dir=os.path.join(args.output, 'tmp'), + use_deep_copy_optimization=True): + decoded_input_data = ( + input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource()) + transform_fn = ((decoded_input_data, input_tfxio.TensorAdapterConfig()) + | 'Analyze' >> tft_beam.AnalyzeDataset(preprocessing_fn)) + + if args.shuffle: + # Shuffle the data before any decoding (more compact representation). + input_data |= 'Shuffle' >> beam.transforms.Reshuffle() # pylint: disable=no-value-for-parameter + + decoded_input_data = ( + input_data | 'DecodeForTransform' >> input_tfxio.BeamSource()) + (dataset, + metadata) = ((decoded_input_data, input_tfxio.TensorAdapterConfig()), + transform_fn) | 'Transform' >> tft_beam.TransformDataset( + output_record_batches=output_record_batches) + + if output_record_batches: + + def record_batch_to_examples(batch, unary_passthrough_features): + """Encodes transformed data as tf.Examples.""" + # Ignore unary pass-through features. + del unary_passthrough_features + # From beam: "imports, functions and other variables defined in the + # global context of your __main__ file of your Dataflow pipeline are, by + # default, not available in the worker execution environment, and such + # references will cause a NameError, unless the --save_main_session + # pipeline option is set to True. Please see + # https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors ." + from tfx_bsl.coders.example_coder import RecordBatchToExamples + return RecordBatchToExamples(batch) + + encode_ptransform = beam.FlatMapTuple(record_batch_to_examples) + else: + example_coder = coders.ExampleProtoCoder(metadata.schema) + encode_ptransform = beam.Map(example_coder.encode) + + # TODO: Use WriteDataset instead when it becomes available. + ( + dataset + | 'Encode' >> encode_ptransform + | 'Write' >> beam.io.WriteToTFRecord( + os.path.join(args.output, 'features_train'), + file_name_suffix='.tfrecord.gz')) + # transform_fn | beam.Map(print) + transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(args.output) + + # TODO: Remember to eventually also save the statistics. + else: + logging.fatal('Unknown benchmark type: %s', args.benchmark_type) + + +def parse_known_args(argv): + """Parses args for this workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Input path for input files.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Output path for output files.') + parser.add_argument( Review Comment: Do we use this argument? ########## .test-infra/jenkins/job_BenchmarkTests_CloudML_Python.groovy: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PhraseTriggeringPostCommitBuilder +import CronJobBuilder + +def cloudMLJob = { scope -> + scope.description('Runs the TFT Criteo Examples on the Dataflow runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(scope) + + // Gradle goals for this job. + scope.steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + commonJobProperties.setGradleSwitches(delegate, 'master', 360) + tasks(':sdks:python:test-suites:dataflow:tftTests') + } + } +} Review Comment: Should we also be running this as a normal postcommit like https://github.com/apache/beam/blob/853fdd38aafda9d01648bd3733103b1a34a044a1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L192 ########## .test-infra/jenkins/job_BenchmarkTests_CloudML_Python.groovy: ########## @@ -0,0 +1,45 @@ +/* Review Comment: We should probably follow the naming convention established in https://github.com/apache/beam/blob/853fdd38aafda9d01648bd3733103b1a34a044a1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy (so `beam_CloudMLBenchmarkTests_Python.groovy`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
