[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=89205=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89205
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 10/Apr/18 00:32
Start Date: 10/Apr/18 00:32
Worklog Time Spent: 10m 
  Work Description: robertwb closed pull request #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index cb74fc06108..e2210aab770 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -211,36 +211,8 @@ def visit_transform(self, transform_node):
 from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
 if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
   pcoll = transform_node.inputs[0]
-  input_type = pcoll.element_type
-  # If input_type is not specified, then treat it as `Any`.
-  if not input_type:
-input_type = typehints.Any
-
-  def coerce_to_kv_type(element_type):
-if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-  if len(element_type.tuple_types) == 2:
-return element_type
-  else:
-raise ValueError(
-"Tuple input to GroupByKey must be have two components. "
-"Found %s for %s" % (element_type, pcoll))
-elif isinstance(input_type, typehints.AnyTypeConstraint):
-  # `Any` type needs to be replaced with a KV[Any, Any] to
-  # force a KV coder as the main output coder for the pcollection
-  # preceding a GroupByKey.
-  return typehints.KV[typehints.Any, typehints.Any]
-elif isinstance(element_type, typehints.UnionConstraint):
-  union_types = [
-  coerce_to_kv_type(t) for t in element_type.union_types]
-  return typehints.KV[
-  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-else:
-  # TODO: Possibly handle other valid types.
-  raise ValueError(
-  "Input to GroupByKey must be of Tuple or Any type. "
-  "Found %s for %s" % (element_type, pcoll))
-  pcoll.element_type = coerce_to_kv_type(input_type)
+  pcoll.element_type = typehints.coerce_to_kv_type(
+  pcoll.element_type, transform_node.full_label)
   key_type, value_type = pcoll.element_type.tuple_types
   if transform_node.outputs:
 transform_node.outputs[None].element_type = typehints.KV[
@@ -248,6 +220,59 @@ def coerce_to_kv_type(element_type):
 
 return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+# Imported here to avoid circular dependencies.
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.transforms.core import ParDo
+
+class SideInputVisitor(PipelineVisitor):
+  """Ensures input `PCollection` used as a side inputs has a `KV` type.
+
+  TODO(BEAM-115): Once Python SDK is compatible with the new Runner API,
+  we could directly replace the coder instead of mutating the element type.
+  """
+  def visit_transform(self, transform_node):
+if isinstance(transform_node.transform, ParDo):
+  new_side_inputs = []
+  for ix, side_input in enumerate(transform_node.side_inputs):
+access_pattern = side_input._side_input_data().access_pattern
+if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
+  # Add a map to ('', value) as Dataflow currently only handles
+  # keyed side inputs.
+  pipeline = side_input.pvalue.pipeline
+  new_side_input = _DataflowIterableSideInput(side_input)
+  new_side_input.pvalue = beam.pvalue.PCollection(
+  pipeline,
+  element_type=typehints.KV[
+  str, side_input.pvalue.element_type])
+  parent = transform_node.parent or pipeline._root_transform()
+  map_to_void_key = beam.pipeline.AppliedPTransform(
+  pipeline,
+  beam.Map(lambda x: ('', x)),
+  

[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=89145=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89145
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 09/Apr/18 21:42
Start Date: 09/Apr/18 21:42
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-379903086
 
 
   Correct. I was hoping for a fully green run, as this was rolled back once 
before, but at least it verifies the DatflowRunner passes corectly. 
   
   ```
   # Run integration tests on the Google Cloud Dataflow service
   # and validate that jobs finish successfully.
   echo ">>> RUNNING TEST DATAFLOW RUNNER it tests"
   >>> RUNNING TEST DATAFLOW RUNNER it tests
   python setup.py nosetests \
 --attr IT \
 --nocapture \
 --processes=4 \
 --process-timeout=1800 \
 --test-pipeline-options=" \
   --runner=TestDataflowRunner \
   --project=$PROJECT \
   --staging_location=$GCS_LOCATION/staging-it \
   --temp_location=$GCS_LOCATION/temp-it \
   --output=$GCS_LOCATION/py-it-cloud/output \
   --sdk_location=$SDK_LOCATION \
   --num_workers=1 \
   --sleep_secs=20"
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/local/lib/python2.7/site-packages/setuptools/dist.py:397:
 UserWarning: Normalizing '2.5.0.dev' to '2.5.0.dev0'
 normalized_version,
   running nosetests
   running egg_info
   writing requirements to apache_beam.egg-info/requires.txt
   writing apache_beam.egg-info/PKG-INFO
   writing top-level names to apache_beam.egg-info/top_level.txt
   writing dependency_links to apache_beam.egg-info/dependency_links.txt
   writing entry points to apache_beam.egg-info/entry_points.txt
   reading manifest file 'apache_beam.egg-info/SOURCES.txt'
   reading manifest template 'MANIFEST.in'
   warning: no files found matching 'README.md'
   warning: no files found matching 'NOTICE'
   warning: no files found matching 'LICENSE'
   writing manifest file 'apache_beam.egg-info/SOURCES.txt'
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/io/gcp/gcsio.py:166:
 DeprecationWarning: object() takes no parameters
 super(GcsIO, cls).__new__(cls, storage_client))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/io/gcp/gcsio.py:166:
 DeprecationWarning: object() takes no parameters
 super(GcsIO, cls).__new__(cls, storage_client))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49:
 DeprecationWarning: options is deprecated since First stable release.. 
References to .options will not be supported
 print('Found: %s.' % self.build_console_url(pipeline.options))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49:
 DeprecationWarning: options is deprecated since First stable release.. 
References to .options will not be supported
 print('Found: %s.' % self.build_console_url(pipeline.options))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49:
 DeprecationWarning: options is deprecated since First stable release.. 
References to .options will not be supported
 print('Found: %s.' % self.build_console_url(pipeline.options))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:779:
 DeprecationWarning: options is deprecated since First stable release.. 
References to .options will not be supported
 transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
   
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py:49:
 DeprecationWarning: options is deprecated since First stable release.. 
References to .options will not be supported
 print('Found: %s.' % self.build_console_url(pipeline.options))
   test_streaming_wordcount_it 
(apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT) ... ok
   test_wordcount_fnapi_it (apache_beam.examples.wordcount_it_test.WordCountIT) 
... ok
   test_bigquery_tornadoes_it 
(apache_beam.examples.cookbook.bigquery_tornadoes_it_test.BigqueryTornadoesIT) 
... ok
   test_wordcount_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... ok
   
   --
   XML: 
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/nosetests.xml
   

[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88997
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 09/Apr/18 16:22
Start Date: 09/Apr/18 16:22
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #4983: [BEAM-2927] Re-enable 
side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-379810387
 
 
   If it helps at all, the failure occurs in hdfs_integration_test.sh, 
specifically in `time docker-compose -p ${PROJECT_NAME} up --exit-code-from 
test \
   --abort-on-container-exit`, so I'd think it's not related to your 
change. I'm seeing the same failure in my postcommits.
   
   
![image](https://user-images.githubusercontent.com/1301740/38509655-6422ca70-3bd7-11e8-99f4-95a4fe94c2a3.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88997)
Time Spent: 3h  (was: 2h 50m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
> Fix For: 2.5.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88984
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 09/Apr/18 15:58
Start Date: 09/Apr/18 15:58
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-379802719
 
 
   jenkins: retest this please
   
   run python postcommit
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88984)
Time Spent: 2h 40m  (was: 2.5h)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
> Fix For: 2.5.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88651=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88651
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 07/Apr/18 00:12
Start Date: 07/Apr/18 00:12
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-379416476
 
 
   jenkins: retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88651)
Time Spent: 2.5h  (was: 2h 20m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
> Fix For: 2.5.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=88647=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88647
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 06/Apr/18 23:44
Start Date: 06/Apr/18 23:44
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-379412603
 
 
   run python postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88647)
Time Spent: 2h 20m  (was: 2h 10m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-04-04 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=87837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87837
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 05/Apr/18 00:35
Start Date: 05/Apr/18 00:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-378786963
 
 
   run python postcommit
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87837)
Time Spent: 2h 10m  (was: 2h)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=86130=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86130
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:54
Start Date: 30/Mar/18 18:54
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983#issuecomment-377595999
 
 
   run python postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86130)
Time Spent: 2h  (was: 1h 50m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=86124=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86124
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 30/Mar/18 18:49
Start Date: 30/Mar/18 18:49
Worklog Time Spent: 10m 
  Work Description: robertwb opened a new pull request #4983: [BEAM-2927] 
Re-enable side inputs for Fn API on Dataflow
URL: https://github.com/apache/beam/pull/4983
 
 
   DESCRIPTION HERE
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86124)
Time Spent: 1h 50m  (was: 1h 40m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=82553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82553
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 21/Mar/18 00:05
Start Date: 21/Mar/18 00:05
Worklog Time Spent: 10m 
  Work Description: robertwb closed pull request #4781: [BEAM-2927] Python 
support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 7a5b884b1af..82130d6e2b7 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -211,36 +211,8 @@ def visit_transform(self, transform_node):
 from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
 if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
   pcoll = transform_node.inputs[0]
-  input_type = pcoll.element_type
-  # If input_type is not specified, then treat it as `Any`.
-  if not input_type:
-input_type = typehints.Any
-
-  def coerce_to_kv_type(element_type):
-if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-  if len(element_type.tuple_types) == 2:
-return element_type
-  else:
-raise ValueError(
-"Tuple input to GroupByKey must be have two components. "
-"Found %s for %s" % (element_type, pcoll))
-elif isinstance(input_type, typehints.AnyTypeConstraint):
-  # `Any` type needs to be replaced with a KV[Any, Any] to
-  # force a KV coder as the main output coder for the pcollection
-  # preceding a GroupByKey.
-  return typehints.KV[typehints.Any, typehints.Any]
-elif isinstance(element_type, typehints.UnionConstraint):
-  union_types = [
-  coerce_to_kv_type(t) for t in element_type.union_types]
-  return typehints.KV[
-  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-else:
-  # TODO: Possibly handle other valid types.
-  raise ValueError(
-  "Input to GroupByKey must be of Tuple or Any type. "
-  "Found %s for %s" % (element_type, pcoll))
-  pcoll.element_type = coerce_to_kv_type(input_type)
+  pcoll.element_type = typehints.coerce_to_kv_type(
+  pcoll.element_type, transform_node.full_label)
   key_type, value_type = pcoll.element_type.tuple_types
   if transform_node.outputs:
 transform_node.outputs[None].element_type = typehints.KV[
@@ -248,6 +220,59 @@ def coerce_to_kv_type(element_type):
 
 return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+# Imported here to avoid circular dependencies.
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.transforms.core import ParDo
+
+class SideInputVisitor(PipelineVisitor):
+  """Ensures input `PCollection` used as a side inputs has a `KV` type.
+
+  TODO(BEAM-115): Once Python SDK is compatible with the new Runner API,
+  we could directly replace the coder instead of mutating the element type.
+  """
+  def visit_transform(self, transform_node):
+if isinstance(transform_node.transform, ParDo):
+  new_side_inputs = []
+  for ix, side_input in enumerate(transform_node.side_inputs):
+access_pattern = side_input._side_input_data().access_pattern
+if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
+  # Add a map to ('', value) as Dataflow currently only handles
+  # keyed side inputs.
+  pipeline = side_input.pvalue.pipeline
+  new_side_input = _DataflowIterableSideInput(side_input)
+  new_side_input.pvalue = beam.pvalue.PCollection(
+  pipeline,
+  element_type=typehints.KV[
+  str, side_input.pvalue.element_type])
+  parent = transform_node.parent or pipeline._root_transform()
+  map_to_void_key = beam.pipeline.AppliedPTransform(
+  pipeline,
+  beam.Map(lambda x: ('', x)),

[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81899
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 19/Mar/18 15:57
Start Date: 19/Mar/18 15:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #4781: 
[BEAM-2927] Python support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781#discussion_r175249828
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -211,43 +211,68 @@ def visit_transform(self, transform_node):
 from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
 if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
   pcoll = transform_node.inputs[0]
-  input_type = pcoll.element_type
-  # If input_type is not specified, then treat it as `Any`.
-  if not input_type:
-input_type = typehints.Any
-
-  def coerce_to_kv_type(element_type):
-if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-  if len(element_type.tuple_types) == 2:
-return element_type
-  else:
-raise ValueError(
-"Tuple input to GroupByKey must be have two components. "
-"Found %s for %s" % (element_type, pcoll))
-elif isinstance(input_type, typehints.AnyTypeConstraint):
-  # `Any` type needs to be replaced with a KV[Any, Any] to
-  # force a KV coder as the main output coder for the pcollection
-  # preceding a GroupByKey.
-  return typehints.KV[typehints.Any, typehints.Any]
-elif isinstance(element_type, typehints.UnionConstraint):
-  union_types = [
-  coerce_to_kv_type(t) for t in element_type.union_types]
-  return typehints.KV[
-  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-else:
-  # TODO: Possibly handle other valid types.
-  raise ValueError(
-  "Input to GroupByKey must be of Tuple or Any type. "
-  "Found %s for %s" % (element_type, pcoll))
-  pcoll.element_type = coerce_to_kv_type(input_type)
+  pcoll.element_type = typehints.coerce_to_kv_type(
+  pcoll.element_type, transform_node.full_label)
   key_type, value_type = pcoll.element_type.tuple_types
   if transform_node.outputs:
 transform_node.outputs[None].element_type = typehints.KV[
 key_type, typehints.Iterable[value_type]]
 
 return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+# Imported here to avoid circular dependencies.
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.transforms.core import ParDo
+
+class SideInputVisitor(PipelineVisitor):
+  """Ensures input `PCollection` used as a side inputs have a `KV` type.
+
+  TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+  we could directly replace the coder instead of mutating the element type.
+  """
+  def visit_transform(self, transform_node):
+if isinstance(transform_node.transform, ParDo):
+  new_side_inputs = []
+  for ix, side_input in enumerate(transform_node.side_inputs):
+access_pattern = side_input._side_input_data().access_pattern
+if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
 
 Review comment:
   Looks like external test coverage for DataflowRunner is pretty sparse... I 
added a unit test of this explicitly (and also tested manually against the 
Dataflow runner). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81899)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message 

[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81897
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 19/Mar/18 15:57
Start Date: 19/Mar/18 15:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #4781: 
[BEAM-2927] Python support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781#discussion_r175249349
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -211,43 +211,68 @@ def visit_transform(self, transform_node):
 from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
 if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
   pcoll = transform_node.inputs[0]
-  input_type = pcoll.element_type
-  # If input_type is not specified, then treat it as `Any`.
-  if not input_type:
-input_type = typehints.Any
-
-  def coerce_to_kv_type(element_type):
-if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-  if len(element_type.tuple_types) == 2:
-return element_type
-  else:
-raise ValueError(
-"Tuple input to GroupByKey must be have two components. "
-"Found %s for %s" % (element_type, pcoll))
-elif isinstance(input_type, typehints.AnyTypeConstraint):
-  # `Any` type needs to be replaced with a KV[Any, Any] to
-  # force a KV coder as the main output coder for the pcollection
-  # preceding a GroupByKey.
-  return typehints.KV[typehints.Any, typehints.Any]
-elif isinstance(element_type, typehints.UnionConstraint):
-  union_types = [
-  coerce_to_kv_type(t) for t in element_type.union_types]
-  return typehints.KV[
-  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-else:
-  # TODO: Possibly handle other valid types.
-  raise ValueError(
-  "Input to GroupByKey must be of Tuple or Any type. "
-  "Found %s for %s" % (element_type, pcoll))
-  pcoll.element_type = coerce_to_kv_type(input_type)
+  pcoll.element_type = typehints.coerce_to_kv_type(
+  pcoll.element_type, transform_node.full_label)
   key_type, value_type = pcoll.element_type.tuple_types
   if transform_node.outputs:
 transform_node.outputs[None].element_type = typehints.KV[
 key_type, typehints.Iterable[value_type]]
 
 return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+# Imported here to avoid circular dependencies.
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.transforms.core import ParDo
+
+class SideInputVisitor(PipelineVisitor):
+  """Ensures input `PCollection` used as a side inputs have a `KV` type.
+
+  TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+  we could directly replace the coder instead of mutating the element type.
+  """
+  def visit_transform(self, transform_node):
+if isinstance(transform_node.transform, ParDo):
+  new_side_inputs = []
+  for ix, side_input in enumerate(transform_node.side_inputs):
+access_pattern = side_input._side_input_data().access_pattern
+if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
+  # Add a map to ('', value) as Dataflow currently only handles
+  # keyed side inputs.
+  pipeline = side_input.pvalue.pipeline
+  new_side_input = _DataflowIterableSideInput(side_input)
+  new_side_input.pvalue = beam.pvalue.PCollection(
+  pipeline,
+  element_type=typehints.KV[
+  str, side_input.pvalue.element_type])
+  parent = transform_node.parent or pipeline._root_transform()
 
 Review comment:
   That is a good question, but I did run into this case. (Probably a bug, 
filed BEAM-3871 for follow-up.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81897)
Time Spent: 1.5h  (was: 1h 20m)

> Python SDK support for 

[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81898=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81898
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 19/Mar/18 15:57
Start Date: 19/Mar/18 15:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #4781: 
[BEAM-2927] Python support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781#discussion_r175249397
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -211,43 +211,68 @@ def visit_transform(self, transform_node):
 from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
 if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
   pcoll = transform_node.inputs[0]
-  input_type = pcoll.element_type
-  # If input_type is not specified, then treat it as `Any`.
-  if not input_type:
-input_type = typehints.Any
-
-  def coerce_to_kv_type(element_type):
-if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-  if len(element_type.tuple_types) == 2:
-return element_type
-  else:
-raise ValueError(
-"Tuple input to GroupByKey must be have two components. "
-"Found %s for %s" % (element_type, pcoll))
-elif isinstance(input_type, typehints.AnyTypeConstraint):
-  # `Any` type needs to be replaced with a KV[Any, Any] to
-  # force a KV coder as the main output coder for the pcollection
-  # preceding a GroupByKey.
-  return typehints.KV[typehints.Any, typehints.Any]
-elif isinstance(element_type, typehints.UnionConstraint):
-  union_types = [
-  coerce_to_kv_type(t) for t in element_type.union_types]
-  return typehints.KV[
-  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-else:
-  # TODO: Possibly handle other valid types.
-  raise ValueError(
-  "Input to GroupByKey must be of Tuple or Any type. "
-  "Found %s for %s" % (element_type, pcoll))
-  pcoll.element_type = coerce_to_kv_type(input_type)
+  pcoll.element_type = typehints.coerce_to_kv_type(
+  pcoll.element_type, transform_node.full_label)
   key_type, value_type = pcoll.element_type.tuple_types
   if transform_node.outputs:
 transform_node.outputs[None].element_type = typehints.KV[
 key_type, typehints.Iterable[value_type]]
 
 return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+# Imported here to avoid circular dependencies.
+# pylint: disable=wrong-import-order, wrong-import-position
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.transforms.core import ParDo
+
+class SideInputVisitor(PipelineVisitor):
+  """Ensures input `PCollection` used as a side inputs have a `KV` type.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81898)
Time Spent: 1.5h  (was: 1h 20m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2927) Python SDK support for portable side input

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81896
 ]

ASF GitHub Bot logged work on BEAM-2927:


Author: ASF GitHub Bot
Created on: 19/Mar/18 15:57
Start Date: 19/Mar/18 15:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #4781: 
[BEAM-2927] Python support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781#discussion_r175249415
 
 

 ##
 File path: sdks/python/apache_beam/typehints/typehints.py
 ##
 @@ -1098,3 +1098,36 @@ def is_consistent_with(sub, base):
 # Nothing but object lives above any type constraints.
 return base == object
   return issubclass(sub, base)
+
+
+def coerce_to_kv_type(element_type, label=None):
 
 Review comment:
   Yes, they do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81896)
Time Spent: 1h 20m  (was: 1h 10m)

> Python SDK support for portable side input
> --
>
> Key: BEAM-2927
> URL: https://issues.apache.org/jira/browse/BEAM-2927
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Henning Rohde
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)