chamikaramj commented on a change in pull request #12355:
URL: https://github.com/apache/beam/pull/12355#discussion_r459644349
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
# limitations under the License.
#
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+
+Java and docker must be available to run this pipeline.
Review comment:
I think we need a README file more specific instructions for Dataflow
and portable runners for users to easily try this out. I believe @TheNeuralBit
is working on it.
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
# limitations under the License.
#
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
Review comment:
".. that use the SQL transform." (to differentiate from
wordcount_xlang.py)
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
- # Preemptively start due to BEAM-6666.
- p.runner.create_job_service(pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
+ if isinstance(p.runner, portable_runner.PortableRunner):
Review comment:
Can we generalize this so that users can easily try this out with
Dataflow as well ?
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
# limitations under the License.
#
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+
Review comment:
Can we also briefly describe what this pipeline does and what output it
produces ?
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
from apache_beam.transforms.sql import SqlTransform
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.
Review comment:
We should also document current limitations of sql.py in the pydoc in
that file (if any).
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
- # Preemptively start due to BEAM-6666.
- p.runner.create_job_service(pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
+ if isinstance(p.runner, portable_runner.PortableRunner):
+ # Preemptively start due to BEAM-6666.
+ p.runner.create_job_service(pipeline_options)
Review comment:
This should not be done for Dataflow.
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
- # Preemptively start due to BEAM-6666.
- p.runner.create_job_service(pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
+ if isinstance(p.runner, portable_runner.PortableRunner):
+ # Preemptively start due to BEAM-6666.
+ p.runner.create_job_service(pipeline_options)
+
+ run(p, known_args.input, known_args.output)
- run(p, known_args.input, known_args.output)
+# Some more fun queries:
+# ------
+# SELECT
+# word as key,
+# COUNT(*) as `count`
+# FROM PCOLLECTION
+# GROUP BY word
Review comment:
Did we confirm that these are working for Daataflow and portable runners
?
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
from apache_beam.transforms.sql import SqlTransform
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
Review comment:
What are the other ways ? We should document all such possibilities in
the Sql transform. Currently sql.py mentions that input PCollection must have a
PCollection NamedTuple type.
##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
from apache_beam.transforms.sql import SqlTransform
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.
MyRow = typing.NamedTuple('MyRow', [('word', unicode)])
coders.registry.register_coder(MyRow, coders.RowCoder)
-# Some more fun queries:
-# ------
-# SELECT
-# word as key,
-# COUNT(*) as `count`
-# FROM PCOLLECTION
-# GROUP BY word
-# ORDER BY `count` DESC
-# LIMIT 100
-# ------
-# SELECT
-# len as key,
-# COUNT(*) as `count`
-# FROM (
-# SELECT
-# LENGTH(word) AS len
-# FROM PCOLLECTION
-# )
-# GROUP BY len
-
def run(p, input_file, output_file):
#pylint: disable=expression-not-assigned
(
p
- | 'read' >> ReadFromText(input_file)
- | 'split' >> beam.FlatMap(str.split)
- | 'row' >> beam.Map(MyRow).with_output_types(MyRow)
- | 'sql!!' >> SqlTransform(
+ | 'Read' >> ReadFromText(input_file)
+ | 'Split' >> beam.FlatMap(lambda line: re.split(r'\W+', line))
+ | 'ToRow' >> beam.Map(MyRow).with_output_types(MyRow)
+ | 'Sql!!' >> SqlTransform(
"""
SELECT
word as key,
COUNT(*) as `count`
FROM PCOLLECTION
GROUP BY word""")
- | 'format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))
- | 'write' >> WriteToText(output_file))
-
- result = p.run()
- result.wait_until_finish()
+ | 'Format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))
Review comment:
key and count properties are already defined ? Let's clarify that here
in a comment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]