[ 
https://issues.apache.org/jira/browse/BEAM-2887?focusedWorklogId=152626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152626
 ]

ASF GitHub Bot logged work on BEAM-2887:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 10:55
            Start Date: 09/Oct/18 10:55
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6504: [BEAM-2887] Remove 
special FnApi version of wordcount.
URL: https://github.com/apache/beam/pull/6504
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/wordcount_fnapi.py 
b/sdks/python/apache_beam/examples/wordcount_fnapi.py
deleted file mode 100644
index bf4998af15e..00000000000
--- a/sdks/python/apache_beam/examples/wordcount_fnapi.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A word-counting workflow using the experimental FnApi.
-
-For the stable wordcount example see wordcount.py.
-"""
-
-# TODO(BEAM-2887): Merge with wordcount.py.
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-from past.builtins import unicode
-
-import apache_beam as beam
-from apache_beam.io import ReadFromText
-# TODO(BEAM-2887): Enable after the issue is fixed.
-# from apache_beam.io import WriteToText
-from apache_beam.metrics import Metrics
-from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import SetupOptions
-
-
-class WordExtractingDoFn(beam.DoFn):
-  """Parse each line of input text into words."""
-
-  def __init__(self):
-    super(WordExtractingDoFn, self).__init__()
-    self.words_counter = Metrics.counter(self.__class__, 'words')
-    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
-    self.word_lengths_dist = Metrics.distribution(
-        self.__class__, 'word_len_dist')
-    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
-
-  def process(self, element):
-    """Returns an iterator over the words of this element.
-
-    The element is a line of text.  If the line is blank, note that, too.
-
-    Args:
-      element: the element being processed
-
-    Returns:
-      The processed element.
-    """
-    text_line = element.strip()
-    if not text_line:
-      self.empty_line_counter.inc(1)
-    words = re.findall(r'[A-Za-z\']+', text_line)
-    for w in words:
-      self.words_counter.inc()
-      self.word_lengths_counter.inc(len(w))
-      self.word_lengths_dist.update(len(w))
-    return words
-
-
-def run(argv=None):
-  """Main entry point; defines and runs the wordcount pipeline."""
-  parser = argparse.ArgumentParser()
-  parser.add_argument('--input',
-                      dest='input',
-                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
-                      help='Input file to process.')
-  parser.add_argument('--output',
-                      dest='output',
-                      required=True,
-                      help='Output file to write results to.')
-  known_args, pipeline_args = parser.parse_known_args(argv)
-
-  # We use the save_main_session option because one or more DoFn's in this
-  # workflow rely on global context (e.g., a module imported at module level).
-  pipeline_options = PipelineOptions(pipeline_args)
-  pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Ensure that the experiment flag is set explicitly by the user.
-  debug_options = pipeline_options.view_as(DebugOptions)
-  use_fn_api = (
-      debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
-  assert use_fn_api, 'Enable beam_fn_api experiment, in order run this 
example.'
-
-  # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> ReadFromText(known_args.input)
-
-  counts = (lines
-            | 'split' >> (beam.ParDo(WordExtractingDoFn())
-                          .with_output_types(unicode))
-            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
-            | 'group_and_sum' >> beam.CombinePerKey(sum))
-
-  # Format the counts into a PCollection of strings.
-  def format_result(word_count):
-    (word, count) = word_count
-    return '%s: %s' % (word, count)
-
-  # pylint: disable=unused-variable
-  output = counts | 'format' >> beam.Map(format_result)
-
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-
-  # TODO(BEAM-2887): Enable after the issue is fixed.
-  # output | 'write' >> WriteToText(known_args.output)
-
-  result = p.run()
-  result.wait_until_finish()
-
-  # Do not query metrics when creating a template which doesn't run
-  if (not hasattr(result, 'has_job')    # direct runner
-      or result.has_job):               # not just a template creation
-    empty_lines_filter = MetricsFilter().with_name('empty_lines')
-    query_result = result.metrics().query(empty_lines_filter)
-    if query_result['counters']:
-      empty_lines_counter = query_result['counters'][0]
-      logging.info('number of empty lines: %d', empty_lines_counter.committed)
-
-    word_lengths_filter = MetricsFilter().with_name('word_len_dist')
-    query_result = result.metrics().query(word_lengths_filter)
-    if query_result['distributions']:
-      word_lengths_dist = query_result['distributions'][0]
-      logging.info('average word length: %d', word_lengths_dist.committed.mean)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  run()
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py 
b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 4c262746e36..ee9cee7a11a 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -27,7 +27,6 @@
 from nose.plugins.attrib import attr
 
 from apache_beam.examples import wordcount
-from apache_beam.examples import wordcount_fnapi
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -45,6 +44,13 @@ class WordCountIT(unittest.TestCase):
 
   @attr('IT')
   def test_wordcount_it(self):
+    self._run_wordcount_it()
+
+  @attr('IT', 'ValidatesContainer')
+  def test_wordcount_fnapi_it(self):
+    self._run_wordcount_it(experiment='beam_fn_api')
+
+  def _run_wordcount_it(self, **opts):
     test_pipeline = TestPipeline(is_integration_test=True)
 
     # Set extra options to the pipeline for test purpose
@@ -59,6 +65,7 @@ def test_wordcount_it(self):
                                               sleep_secs)]
     extra_opts = {'output': output,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
+    extra_opts.update(opts)
 
     # Register clean up before pipeline execution
     self.addCleanup(delete_files, [output + '*'])
@@ -67,17 +74,6 @@ def test_wordcount_it(self):
     # and start pipeline job by calling pipeline main function.
     wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
 
-  @attr('IT', 'ValidatesContainer')
-  def test_wordcount_fnapi_it(self):
-    test_pipeline = TestPipeline(is_integration_test=True)
-
-    # Get pipeline options from command argument: --test-pipeline-options,
-    # and start pipeline job by calling pipeline main function.
-    wordcount_fnapi.run(
-        test_pipeline.get_full_options_as_args(
-            experiment='beam_fn_api',
-            on_success_matcher=PipelineStateMatcher()))
-
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152626)
    Time Spent: 2h 40m  (was: 2.5h)

> Python SDK support for portable pipelines
> -----------------------------------------
>
>                 Key: BEAM-2887
>                 URL: https://issues.apache.org/jira/browse/BEAM-2887
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Henning Rohde
>            Assignee: Ahmet Altay
>            Priority: Major
>              Labels: portability
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to