chamikaramj commented on a change in pull request #12355:
URL: https://github.com/apache/beam/pull/12355#discussion_r459644349



##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
 # limitations under the License.
 #
 
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+
+Java and docker must be available to run this pipeline.

Review comment:
       I think we need a README file more specific instructions for Dataflow 
and portable runners for users to easily try this out. I believe @TheNeuralBit 
is working on it.

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
 # limitations under the License.
 #
 
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.

Review comment:
       ".. that use the SQL transform."  (to differentiate from 
wordcount_xlang.py)

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options.view_as(SetupOptions).save_main_session = True
 
-  p = beam.Pipeline(options=pipeline_options)
-  # Preemptively start due to BEAM-6666.
-  p.runner.create_job_service(pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
+    if isinstance(p.runner, portable_runner.PortableRunner):

Review comment:
       Can we generalize this so that users can easily try this out with 
Dataflow as well ?

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
 # limitations under the License.
 #
 
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+

Review comment:
       Can we also briefly describe what this pipeline does and what output it 
produces ?

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.

Review comment:
       We should also document current limitations of sql.py in the pydoc in 
that file (if any).

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options.view_as(SetupOptions).save_main_session = True
 
-  p = beam.Pipeline(options=pipeline_options)
-  # Preemptively start due to BEAM-6666.
-  p.runner.create_job_service(pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
+    if isinstance(p.runner, portable_runner.PortableRunner):
+      # Preemptively start due to BEAM-6666.
+      p.runner.create_job_service(pipeline_options)

Review comment:
       This should not be done for Dataflow.

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options.view_as(SetupOptions).save_main_session = True
 
-  p = beam.Pipeline(options=pipeline_options)
-  # Preemptively start due to BEAM-6666.
-  p.runner.create_job_service(pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
+    if isinstance(p.runner, portable_runner.PortableRunner):
+      # Preemptively start due to BEAM-6666.
+      p.runner.create_job_service(pipeline_options)
+
+    run(p, known_args.input, known_args.output)
 
-  run(p, known_args.input, known_args.output)
 
+# Some more fun queries:
+# ------
+# SELECT
+#   word as key,
+#   COUNT(*) as `count`
+# FROM PCOLLECTION
+# GROUP BY word

Review comment:
       Did we confirm that these are working for Daataflow and portable runners 
?

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.

Review comment:
       What are the other ways ? We should document all such possibilities in 
the Sql transform. Currently sql.py mentions that input PCollection must have a 
PCollection NamedTuple type.

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.
 MyRow = typing.NamedTuple('MyRow', [('word', unicode)])
 coders.registry.register_coder(MyRow, coders.RowCoder)
 
-# Some more fun queries:
-# ------
-# SELECT
-#   word as key,
-#   COUNT(*) as `count`
-# FROM PCOLLECTION
-# GROUP BY word
-# ORDER BY `count` DESC
-# LIMIT 100
-# ------
-# SELECT
-#   len as key,
-#   COUNT(*) as `count`
-# FROM (
-#   SELECT
-#     LENGTH(word) AS len
-#   FROM PCOLLECTION
-# )
-# GROUP BY len
-
 
 def run(p, input_file, output_file):
   #pylint: disable=expression-not-assigned
   (
       p
-      | 'read' >> ReadFromText(input_file)
-      | 'split' >> beam.FlatMap(str.split)
-      | 'row' >> beam.Map(MyRow).with_output_types(MyRow)
-      | 'sql!!' >> SqlTransform(
+      | 'Read' >> ReadFromText(input_file)
+      | 'Split' >> beam.FlatMap(lambda line: re.split(r'\W+', line))
+      | 'ToRow' >> beam.Map(MyRow).with_output_types(MyRow)
+      | 'Sql!!' >> SqlTransform(
           """
                    SELECT
                      word as key,
                      COUNT(*) as `count`
                    FROM PCOLLECTION
                    GROUP BY word""")
-      | 'format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))
-      | 'write' >> WriteToText(output_file))
-
-  result = p.run()
-  result.wait_until_finish()
+      | 'Format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))

Review comment:
       key and count properties are already defined ? Let's clarify that here 
in a comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to