This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch revert-10596-uses_keyed_state in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2269a5015dfb0bb3b1f8469d06f1ab622f85a3ab Author: Ankur <angoe...@users.noreply.github.com> AuthorDate: Tue Jan 21 13:43:04 2020 -0800 Revert "[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)" This reverts commit 52b478eeccfd19bfc05fecd519f7dc54db8c67eb. --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++----- sdks/python/apache_beam/runners/dataflow/internal/names.py | 1 - 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index ada700c..69b1fb8 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -52,7 +52,6 @@ from apache_beam.options.pipeline_options import TestOptions from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.portability import common_urns from apache_beam.pvalue import AsSideInput -from apache_beam.runners.common import DoFnSignature from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.dataflow.internal.names import PropertyNames @@ -953,10 +952,6 @@ class DataflowRunner(PipelineRunner): step.add_property(PropertyNames.RESTRICTION_ENCODING, self._get_cloud_encoding(restriction_coder)) - if options.view_as(StandardOptions).streaming and DoFnSignature( - transform.dofn).is_stateful_dofn(): - step.add_property(PropertyNames.USES_KEYED_STATE, "true") - @staticmethod def _pardo_fn_data(transform_node, get_label): transform = transform_node.transform @@ -1134,6 +1129,7 @@ class DataflowRunner(PipelineRunner): coders.registry.get_coder(transform_node.outputs[None].element_type), coders.coders.GlobalWindowCoder()) + from apache_beam.runners.dataflow.internal import apiclient step.encoding = self._get_cloud_encoding(coder) step.add_property( PropertyNames.OUTPUT_INFO, @@ -1219,6 +1215,7 @@ class DataflowRunner(PipelineRunner): # correct coder. coder = coders.WindowedValueCoder(transform.sink.coder, coders.coders.GlobalWindowCoder()) + from apache_beam.runners.dataflow.internal import apiclient step.encoding = self._get_cloud_encoding(coder) step.add_property(PropertyNames.ENCODING, step.encoding) step.add_property( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index e9b34d4..7bc0295 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -123,7 +123,6 @@ class PropertyNames(object): USE_INDEXED_FORMAT = 'use_indexed_format' USER_FN = 'user_fn' USER_NAME = 'user_name' - USES_KEYED_STATE = 'uses_keyed_state' VALIDATE_SINK = 'validate_sink' VALIDATE_SOURCE = 'validate_source' VALUE = 'value'