[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=89205=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89205 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 10/Apr/18 00:32 Start Date: 10/Apr/18 00:32 Worklog Time Spent: 10m Work Description: robertwb closed pull request #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index cb74fc06108..e2210aab770 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -211,36 +211,8 @@ def visit_transform(self, transform_node): from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] - input_type = pcoll.element_type - # If input_type is not specified, then treat it as `Any`. - if not input_type: -input_type = typehints.Any - - def coerce_to_kv_type(element_type): -if isinstance(element_type, typehints.TupleHint.TupleConstraint): - if len(element_type.tuple_types) == 2: -return element_type - else: -raise ValueError( -"Tuple input to GroupByKey must be have two components. " -"Found %s for %s" % (element_type, pcoll)) -elif isinstance(input_type, typehints.AnyTypeConstraint): - # `Any` type needs to be replaced with a KV[Any, Any] to - # force a KV coder as the main output coder for the pcollection - # preceding a GroupByKey. - return typehints.KV[typehints.Any, typehints.Any] -elif isinstance(element_type, typehints.UnionConstraint): - union_types = [ - coerce_to_kv_type(t) for t in element_type.union_types] - return typehints.KV[ - typehints.Union[tuple(t.tuple_types[0] for t in union_types)], - typehints.Union[tuple(t.tuple_types[1] for t in union_types)]] -else: - # TODO: Possibly handle other valid types. - raise ValueError( - "Input to GroupByKey must be of Tuple or Any type. " - "Found %s for %s" % (element_type, pcoll)) - pcoll.element_type = coerce_to_kv_type(input_type) + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: transform_node.outputs[None].element_type = typehints.KV[ @@ -248,6 +220,59 @@ def coerce_to_kv_type(element_type): return GroupByKeyInputVisitor() + @staticmethod + def side_input_visitor(): +# Imported here to avoid circular dependencies. +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.pipeline import PipelineVisitor +from apache_beam.transforms.core import ParDo + +class SideInputVisitor(PipelineVisitor): + """Ensures input `PCollection` used as a side inputs has a `KV` type. + + TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def visit_transform(self, transform_node): +if isinstance(transform_node.transform, ParDo): + new_side_inputs = [] + for ix, side_input in enumerate(transform_node.side_inputs): +access_pattern = side_input._side_input_data().access_pattern +if access_pattern == common_urns.ITERABLE_SIDE_INPUT: + # Add a map to ('', value) as Dataflow currently only handles + # keyed side inputs. + pipeline = side_input.pvalue.pipeline + new_side_input = _DataflowIterableSideInput(side_input) + new_side_input.pvalue = beam.pvalue.PCollection( + pipeline, + element_type=typehints.KV[ + str, side_input.pvalue.element_type]) + parent = transform_node.parent or pipeline._root_transform() + map_to_void_key = beam.pipeline.AppliedPTransform( + pipeline, + beam.Map(lambda x: ('', x)), +
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=89145=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89145 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 09/Apr/18 21:42 Start Date: 09/Apr/18 21:42 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-379903086 Correct. I was hoping for a fully green run, as this was rolled back once before, but at least it verifies the DatflowRunner passes corectly. ``` # Run integration tests on the Google Cloud Dataflow service # and validate that jobs finish successfully. echo ">>> RUNNING TEST DATAFLOW RUNNER it tests" >>> RUNNING TEST DATAFLOW RUNNER it tests python setup.py nosetests \ --attr IT \ --nocapture \ --processes=4 \ --process-timeout=1800 \ --test-pipeline-options=" \ --runner=TestDataflowRunner \ --project=$PROJECT \ --staging_location=$GCS_LOCATION/staging-it \ --temp_location=$GCS_LOCATION/temp-it \ --output=$GCS_LOCATION/py-it-cloud/output \ --sdk_location=$SDK_LOCATION \ --num_workers=1 \ --sleep_secs=20" /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/local/lib/python2.7/site-packages/setuptools/dist.py:397: UserWarning: Normalizing '2.5.0.dev' to '2.5.0.dev0' normalized_version, running nosetests running egg_info writing requirements to apache_beam.egg-info/requires.txt writing apache_beam.egg-info/PKG-INFO writing top-level names to apache_beam.egg-info/top_level.txt writing dependency_links to apache_beam.egg-info/dependency_links.txt writing entry points to apache_beam.egg-info/entry_points.txt reading manifest file 'apache_beam.egg-info/SOURCES.txt' reading manifest template 'MANIFEST.in' warning: no files found matching 'README.md' warning: no files found matching 'NOTICE' warning: no files found matching 'LICENSE' writing manifest file 'apache_beam.egg-info/SOURCES.txt' /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/io/gcp/gcsio.py:166: DeprecationWarning: object() takes no parameters super(GcsIO, cls).__new__(cls, storage_client)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/io/gcp/gcsio.py:166: DeprecationWarning: object() takes no parameters super(GcsIO, cls).__new__(cls, storage_client)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49: DeprecationWarning: options is deprecated since First stable release.. References to .options will not be supported print('Found: %s.' % self.build_console_url(pipeline.options)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49: DeprecationWarning: options is deprecated since First stable release.. References to .options will not be supported print('Found: %s.' % self.build_console_url(pipeline.options)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49: DeprecationWarning: options is deprecated since First stable release.. References to .options will not be supported print('Found: %s.' % self.build_console_url(pipeline.options)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:779: DeprecationWarning: options is deprecated since First stable release.. References to .options will not be supported transform_node.inputs[0].pipeline.options.view_as(StandardOptions)) /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49: DeprecationWarning: options is deprecated since First stable release.. References to .options will not be supported print('Found: %s.' % self.build_console_url(pipeline.options)) test_streaming_wordcount_it (apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT) ... ok test_wordcount_fnapi_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... ok test_bigquery_tornadoes_it (apache_beam.examples.cookbook.bigquery_tornadoes_it_test.BigqueryTornadoesIT) ... ok test_wordcount_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... ok -- XML: /home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/nosetests.xml
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88997 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 09/Apr/18 16:22 Start Date: 09/Apr/18 16:22 Worklog Time Spent: 10m Work Description: pabloem commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-379810387 If it helps at all, the failure occurs in hdfs_integration_test.sh, specifically in `time docker-compose -p ${PROJECT_NAME} up --exit-code-from test \ --abort-on-container-exit`, so I'd think it's not related to your change. I'm seeing the same failure in my postcommits. ![image](https://user-images.githubusercontent.com/1301740/38509655-6422ca70-3bd7-11e8-99f4-95a4fe94c2a3.png) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 88997) Time Spent: 3h (was: 2h 50m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Fix For: 2.5.0 > > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88984 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 09/Apr/18 15:58 Start Date: 09/Apr/18 15:58 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-379802719 jenkins: retest this please run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 88984) Time Spent: 2h 40m (was: 2.5h) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Fix For: 2.5.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88651=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88651 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 07/Apr/18 00:12 Start Date: 07/Apr/18 00:12 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-379416476 jenkins: retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 88651) Time Spent: 2.5h (was: 2h 20m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Fix For: 2.5.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88647=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88647 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 06/Apr/18 23:44 Start Date: 06/Apr/18 23:44 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-379412603 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 88647) Time Spent: 2h 20m (was: 2h 10m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=87837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87837 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 05/Apr/18 00:35 Start Date: 05/Apr/18 00:35 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-378786963 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 87837) Time Spent: 2h 10m (was: 2h) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=86130=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86130 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 30/Mar/18 18:54 Start Date: 30/Mar/18 18:54 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983#issuecomment-377595999 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86130) Time Spent: 2h (was: 1h 50m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=86124=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86124 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 30/Mar/18 18:49 Start Date: 30/Mar/18 18:49 Worklog Time Spent: 10m Work Description: robertwb opened a new pull request #4983: [BEAM-2927] Re-enable side inputs for Fn API on Dataflow URL: https://github.com/apache/beam/pull/4983 DESCRIPTION HERE Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86124) Time Spent: 1h 50m (was: 1h 40m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=82553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82553 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 21/Mar/18 00:05 Start Date: 21/Mar/18 00:05 Worklog Time Spent: 10m Work Description: robertwb closed pull request #4781: [BEAM-2927] Python support for portable side inputs over Fn API URL: https://github.com/apache/beam/pull/4781 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 7a5b884b1af..82130d6e2b7 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -211,36 +211,8 @@ def visit_transform(self, transform_node): from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] - input_type = pcoll.element_type - # If input_type is not specified, then treat it as `Any`. - if not input_type: -input_type = typehints.Any - - def coerce_to_kv_type(element_type): -if isinstance(element_type, typehints.TupleHint.TupleConstraint): - if len(element_type.tuple_types) == 2: -return element_type - else: -raise ValueError( -"Tuple input to GroupByKey must be have two components. " -"Found %s for %s" % (element_type, pcoll)) -elif isinstance(input_type, typehints.AnyTypeConstraint): - # `Any` type needs to be replaced with a KV[Any, Any] to - # force a KV coder as the main output coder for the pcollection - # preceding a GroupByKey. - return typehints.KV[typehints.Any, typehints.Any] -elif isinstance(element_type, typehints.UnionConstraint): - union_types = [ - coerce_to_kv_type(t) for t in element_type.union_types] - return typehints.KV[ - typehints.Union[tuple(t.tuple_types[0] for t in union_types)], - typehints.Union[tuple(t.tuple_types[1] for t in union_types)]] -else: - # TODO: Possibly handle other valid types. - raise ValueError( - "Input to GroupByKey must be of Tuple or Any type. " - "Found %s for %s" % (element_type, pcoll)) - pcoll.element_type = coerce_to_kv_type(input_type) + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: transform_node.outputs[None].element_type = typehints.KV[ @@ -248,6 +220,59 @@ def coerce_to_kv_type(element_type): return GroupByKeyInputVisitor() + @staticmethod + def side_input_visitor(): +# Imported here to avoid circular dependencies. +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.pipeline import PipelineVisitor +from apache_beam.transforms.core import ParDo + +class SideInputVisitor(PipelineVisitor): + """Ensures input `PCollection` used as a side inputs has a `KV` type. + + TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def visit_transform(self, transform_node): +if isinstance(transform_node.transform, ParDo): + new_side_inputs = [] + for ix, side_input in enumerate(transform_node.side_inputs): +access_pattern = side_input._side_input_data().access_pattern +if access_pattern == common_urns.ITERABLE_SIDE_INPUT: + # Add a map to ('', value) as Dataflow currently only handles + # keyed side inputs. + pipeline = side_input.pvalue.pipeline + new_side_input = _DataflowIterableSideInput(side_input) + new_side_input.pvalue = beam.pvalue.PCollection( + pipeline, + element_type=typehints.KV[ + str, side_input.pvalue.element_type]) + parent = transform_node.parent or pipeline._root_transform() + map_to_void_key = beam.pipeline.AppliedPTransform( + pipeline, + beam.Map(lambda x: ('', x)),
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81899 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 19/Mar/18 15:57 Start Date: 19/Mar/18 15:57 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #4781: [BEAM-2927] Python support for portable side inputs over Fn API URL: https://github.com/apache/beam/pull/4781#discussion_r175249828 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -211,43 +211,68 @@ def visit_transform(self, transform_node): from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] - input_type = pcoll.element_type - # If input_type is not specified, then treat it as `Any`. - if not input_type: -input_type = typehints.Any - - def coerce_to_kv_type(element_type): -if isinstance(element_type, typehints.TupleHint.TupleConstraint): - if len(element_type.tuple_types) == 2: -return element_type - else: -raise ValueError( -"Tuple input to GroupByKey must be have two components. " -"Found %s for %s" % (element_type, pcoll)) -elif isinstance(input_type, typehints.AnyTypeConstraint): - # `Any` type needs to be replaced with a KV[Any, Any] to - # force a KV coder as the main output coder for the pcollection - # preceding a GroupByKey. - return typehints.KV[typehints.Any, typehints.Any] -elif isinstance(element_type, typehints.UnionConstraint): - union_types = [ - coerce_to_kv_type(t) for t in element_type.union_types] - return typehints.KV[ - typehints.Union[tuple(t.tuple_types[0] for t in union_types)], - typehints.Union[tuple(t.tuple_types[1] for t in union_types)]] -else: - # TODO: Possibly handle other valid types. - raise ValueError( - "Input to GroupByKey must be of Tuple or Any type. " - "Found %s for %s" % (element_type, pcoll)) - pcoll.element_type = coerce_to_kv_type(input_type) + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: transform_node.outputs[None].element_type = typehints.KV[ key_type, typehints.Iterable[value_type]] return GroupByKeyInputVisitor() + @staticmethod + def side_input_visitor(): +# Imported here to avoid circular dependencies. +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.pipeline import PipelineVisitor +from apache_beam.transforms.core import ParDo + +class SideInputVisitor(PipelineVisitor): + """Ensures input `PCollection` used as a side inputs have a `KV` type. + + TODO(BEAM-115): Once Python SDk is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def visit_transform(self, transform_node): +if isinstance(transform_node.transform, ParDo): + new_side_inputs = [] + for ix, side_input in enumerate(transform_node.side_inputs): +access_pattern = side_input._side_input_data().access_pattern +if access_pattern == common_urns.ITERABLE_SIDE_INPUT: Review comment: Looks like external test coverage for DataflowRunner is pretty sparse... I added a unit test of this explicitly (and also tested manually against the Dataflow runner). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81899) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81897 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 19/Mar/18 15:57 Start Date: 19/Mar/18 15:57 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #4781: [BEAM-2927] Python support for portable side inputs over Fn API URL: https://github.com/apache/beam/pull/4781#discussion_r175249349 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -211,43 +211,68 @@ def visit_transform(self, transform_node): from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] - input_type = pcoll.element_type - # If input_type is not specified, then treat it as `Any`. - if not input_type: -input_type = typehints.Any - - def coerce_to_kv_type(element_type): -if isinstance(element_type, typehints.TupleHint.TupleConstraint): - if len(element_type.tuple_types) == 2: -return element_type - else: -raise ValueError( -"Tuple input to GroupByKey must be have two components. " -"Found %s for %s" % (element_type, pcoll)) -elif isinstance(input_type, typehints.AnyTypeConstraint): - # `Any` type needs to be replaced with a KV[Any, Any] to - # force a KV coder as the main output coder for the pcollection - # preceding a GroupByKey. - return typehints.KV[typehints.Any, typehints.Any] -elif isinstance(element_type, typehints.UnionConstraint): - union_types = [ - coerce_to_kv_type(t) for t in element_type.union_types] - return typehints.KV[ - typehints.Union[tuple(t.tuple_types[0] for t in union_types)], - typehints.Union[tuple(t.tuple_types[1] for t in union_types)]] -else: - # TODO: Possibly handle other valid types. - raise ValueError( - "Input to GroupByKey must be of Tuple or Any type. " - "Found %s for %s" % (element_type, pcoll)) - pcoll.element_type = coerce_to_kv_type(input_type) + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: transform_node.outputs[None].element_type = typehints.KV[ key_type, typehints.Iterable[value_type]] return GroupByKeyInputVisitor() + @staticmethod + def side_input_visitor(): +# Imported here to avoid circular dependencies. +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.pipeline import PipelineVisitor +from apache_beam.transforms.core import ParDo + +class SideInputVisitor(PipelineVisitor): + """Ensures input `PCollection` used as a side inputs have a `KV` type. + + TODO(BEAM-115): Once Python SDk is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def visit_transform(self, transform_node): +if isinstance(transform_node.transform, ParDo): + new_side_inputs = [] + for ix, side_input in enumerate(transform_node.side_inputs): +access_pattern = side_input._side_input_data().access_pattern +if access_pattern == common_urns.ITERABLE_SIDE_INPUT: + # Add a map to ('', value) as Dataflow currently only handles + # keyed side inputs. + pipeline = side_input.pvalue.pipeline + new_side_input = _DataflowIterableSideInput(side_input) + new_side_input.pvalue = beam.pvalue.PCollection( + pipeline, + element_type=typehints.KV[ + str, side_input.pvalue.element_type]) + parent = transform_node.parent or pipeline._root_transform() Review comment: That is a good question, but I did run into this case. (Probably a bug, filed BEAM-3871 for follow-up.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81897) Time Spent: 1.5h (was: 1h 20m) > Python SDK support for
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81898=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81898 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 19/Mar/18 15:57 Start Date: 19/Mar/18 15:57 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #4781: [BEAM-2927] Python support for portable side inputs over Fn API URL: https://github.com/apache/beam/pull/4781#discussion_r175249397 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -211,43 +211,68 @@ def visit_transform(self, transform_node): from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] - input_type = pcoll.element_type - # If input_type is not specified, then treat it as `Any`. - if not input_type: -input_type = typehints.Any - - def coerce_to_kv_type(element_type): -if isinstance(element_type, typehints.TupleHint.TupleConstraint): - if len(element_type.tuple_types) == 2: -return element_type - else: -raise ValueError( -"Tuple input to GroupByKey must be have two components. " -"Found %s for %s" % (element_type, pcoll)) -elif isinstance(input_type, typehints.AnyTypeConstraint): - # `Any` type needs to be replaced with a KV[Any, Any] to - # force a KV coder as the main output coder for the pcollection - # preceding a GroupByKey. - return typehints.KV[typehints.Any, typehints.Any] -elif isinstance(element_type, typehints.UnionConstraint): - union_types = [ - coerce_to_kv_type(t) for t in element_type.union_types] - return typehints.KV[ - typehints.Union[tuple(t.tuple_types[0] for t in union_types)], - typehints.Union[tuple(t.tuple_types[1] for t in union_types)]] -else: - # TODO: Possibly handle other valid types. - raise ValueError( - "Input to GroupByKey must be of Tuple or Any type. " - "Found %s for %s" % (element_type, pcoll)) - pcoll.element_type = coerce_to_kv_type(input_type) + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) key_type, value_type = pcoll.element_type.tuple_types if transform_node.outputs: transform_node.outputs[None].element_type = typehints.KV[ key_type, typehints.Iterable[value_type]] return GroupByKeyInputVisitor() + @staticmethod + def side_input_visitor(): +# Imported here to avoid circular dependencies. +# pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.pipeline import PipelineVisitor +from apache_beam.transforms.core import ParDo + +class SideInputVisitor(PipelineVisitor): + """Ensures input `PCollection` used as a side inputs have a `KV` type. Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81898) Time Spent: 1.5h (was: 1h 20m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81896 ] ASF GitHub Bot logged work on BEAM-2927: Author: ASF GitHub Bot Created on: 19/Mar/18 15:57 Start Date: 19/Mar/18 15:57 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #4781: [BEAM-2927] Python support for portable side inputs over Fn API URL: https://github.com/apache/beam/pull/4781#discussion_r175249415 ## File path: sdks/python/apache_beam/typehints/typehints.py ## @@ -1098,3 +1098,36 @@ def is_consistent_with(sub, base): # Nothing but object lives above any type constraints. return base == object return issubclass(sub, base) + + +def coerce_to_kv_type(element_type, label=None): Review comment: Yes, they do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81896) Time Spent: 1h 20m (was: 1h 10m) > Python SDK support for portable side input > -- > > Key: BEAM-2927 > URL: https://issues.apache.org/jira/browse/BEAM-2927 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Henning Rohde >Assignee: Robert Bradshaw >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)