Fix remaining style issues from auto conversion, switch ref in pylint to stage 
1 (stages in futurize are apparently 1 indexed)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd702b3e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd702b3e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd702b3e

Branch: refs/heads/master
Commit: cd702b3e7ea81fe41952066cfe9a1892d5433bd9
Parents: f364248
Author: Holden Karau <hol...@us.ibm.com>
Authored: Fri Sep 1 21:38:47 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/complete/autocomplete.py    |  5 +++--
 .../examples/complete/game/leader_board.py           |  6 ++++--
 .../apache_beam/examples/complete/game/user_score.py |  6 ++++--
 .../examples/complete/juliaset/juliaset/juliaset.py  |  5 ++++-
 sdks/python/apache_beam/examples/complete/tfidf.py   |  6 ++++--
 .../apache_beam/examples/complete/tfidf_test.py      |  5 ++++-
 .../examples/cookbook/datastore_wordcount.py         | 10 ++++++++--
 .../apache_beam/examples/cookbook/mergecontacts.py   | 15 ++++++++++++---
 .../examples/cookbook/multiple_output_pardo.py       | 10 ++++++++--
 .../python/apache_beam/examples/snippets/snippets.py | 15 ++++++++++++---
 .../apache_beam/examples/streaming_wordcount.py      |  5 ++++-
 .../apache_beam/examples/windowed_wordcount.py       |  5 ++++-
 sdks/python/apache_beam/examples/wordcount.py        | 10 ++++++++--
 .../apache_beam/examples/wordcount_debugging.py      | 10 ++++++++--
 .../python/apache_beam/examples/wordcount_minimal.py |  5 ++++-
 .../runners/portability/maptask_executor_runner.py   |  6 +++++-
 sdks/python/apache_beam/transforms/trigger_test.py   | 10 ++++++++--
 17 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py 
b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 81c5351..c09b78e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,13 +45,14 @@ def run(argv=None):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
   with beam.Pipeline(options=pipeline_options) as p:
+    def format_result(prefix_candidates):
+      return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
 
     (p  # pylint: disable=expression-not-assigned
      | 'read' >> ReadFromText(known_args.input)
      | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
      | 'TopPerPrefix' >> TopPerPrefix(5)
-     | 'format' >> beam.Map(
-         lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], 
prefix_candidates[1]))
+     | 'format' >> beam.Map(format_result)
      | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 6fc7b5d..c1b15e9 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -330,10 +330,12 @@ def run(argv=None):
          }))
 
     # Get user scores and write the results to BigQuery
+    def format_user_score_sums(user_score):
+      return {'user': user_score[0], 'total_score': user_score[1]}
+
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
-     | 'FormatUserScoreSums' >> beam.Map(
-         lambda user_score: {'user': user_score[0], 'total_score': 
user_score[1]})
+     | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
      | 'WriteUserScoreSums' >> WriteToBigQuery(
          args.table_name + '_users', args.dataset, {
              'user': 'STRING',

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py 
b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 5e093bb..0405bab 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -138,11 +138,13 @@ def run(argv=None):
   args, pipeline_args = parser.parse_known_args(argv)
 
   with beam.Pipeline(argv=pipeline_args) as p:
+    def format_user_score_sums(user_score):
+      return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)
      | 'UserScore' >> UserScore()
-     | 'FormatUserScoreSums' >> beam.Map(
-         lambda user_score: 'user: %s, total_score: %s' % (user_score[0], 
user_score[1]))
+     | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
      | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
 # [END main]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py 
b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1013168..bb5b185 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -107,8 +107,11 @@ def run(argv=None):  # pylint: disable=missing-docstring
     # Group each coordinate triplet by its x value, then write the coordinates
     # to the output file with an x-coordinate grouping per line.
     # pylint: disable=expression-not-assigned
+    def x_coord_key(x_y_i):
+      return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+
     (coordinates
-     | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], 
x_y_i[2])))
+     | 'x coord key' >> beam.Map(x_coord_key)
      | 'x coord' >> beam.GroupByKey()
      | 'format' >> beam.Map(
          lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1]))

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py 
b/sdks/python/apache_beam/examples/complete/tfidf.py
index 0300505..55404df 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -100,10 +100,12 @@ class TfIdf(beam.PTransform):
     # Adjust the above collection to a mapping from (URI, word) pairs to counts
     # into an isomorphic mapping from URI to (word, count) pairs, to prepare
     # for a join by the URI key.
+    def shift_keys(uri_word_count):
+      return (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))
+
     uri_to_word_and_count = (
         uri_and_word_to_count
-        | 'ShiftKeys' >> beam.Map(
-            lambda uri_word_count: (uri_word_count[0][0], 
(uri_word_count[0][1], uri_word_count[1]))))
+        | 'ShiftKeys' >> beam.Map(shift_keys))
 
     # Perform a CoGroupByKey (a sort of pre-join) on the prepared
     # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py 
b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 957f4c7..71e71e3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -51,6 +51,9 @@ class TfIdfTest(unittest.TestCase):
 
   def test_tfidf_transform(self):
     with TestPipeline() as p:
+      def re_key(word_uri_tfidf):
+        return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+
       uri_to_line = p | 'create sample' >> beam.Create(
           [('1.txt', 'abc def ghi'),
            ('2.txt', 'abc def'),
@@ -58,7 +61,7 @@ class TfIdfTest(unittest.TestCase):
       result = (
           uri_to_line
           | tfidf.TfIdf()
-          | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], 
word_uri_tfidf[1][0], word_uri_tfidf[1][1])))
+          | beam.Map(re_key))
       assert_that(result, equal_to(EXPECTED_RESULTS))
       # Run the pipeline. Note that the assert_that above adds to the pipeline
       # a check that the result PCollection contains expected values.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 13c5998..03fcd8a 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -179,15 +179,21 @@ def read_from_datastore(project, user_options, 
pipeline_options):
       project, query, user_options.namespace)
 
   # Count the occurrences of each word.
+  def count_ones(word_ones):
+    return (word_ones[0], sum(word_ones[1]))
+
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], 
word_c[1]))
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
+  output = counts | 'format' >> beam.Map(format_result)
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py 
b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5a35e51..fdfa286 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -103,12 +103,21 @@ def run(argv=None, assert_results=None):
              '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
 
     # Compute some stats about our database of people.
+    def without_email(name_email_phone_snailmail1):
+      return not next(iter(name_email_phone_snailmail1[1][0]), None)
+
+    def without_phones(name_email_phone_snailmail2):
+      return not next(iter(name_email_phone_snailmail2[1][1]), None)
+
+    def without_address(name_e_p_snailmail):
+      return not next(iter(name_e_p_snailmail[1][2]), None)
+
     luddites = grouped | beam.Filter(  # People without email.
-        lambda name_email_phone_snailmail1: not 
next(iter(name_email_phone_snailmail1[1][0]), None))
+        without_email)
     writers = grouped | beam.Filter(   # People without phones.
-        lambda name_email_phone_snailmail2: not 
next(iter(name_email_phone_snailmail2[1][1]), None))
+        without_phones)
     nomads = grouped | beam.Filter(    # People without addresses.
-        lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), 
None))
+        without_address)
 
     num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
     num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py 
b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 259f95d..fe7929e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -119,11 +119,17 @@ class CountWords(beam.PTransform):
   """
 
   def expand(self, pcoll):
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1])))
-            | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], 
word_c[1])))
+            | 'count' >> beam.Map(count_ones)
+            | 'format' >> beam.Map(format_result))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 873f3c3..10080c9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,6 +535,9 @@ def examples_wordcount_templated(renames):
   lines = p | 'Read' >> ReadFromText(wordcount_options.input)
   # [END example_wordcount_templated]
 
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
   (
       lines
       | 'ExtractWords' >> beam.FlatMap(
@@ -542,7 +545,7 @@ def examples_wordcount_templated(renames):
       | 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
       | 'Group' >> beam.GroupByKey()
       | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
-      | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], 
word_c2[1]))
+      | 'Format' >> beam.Map(format_result)
       | 'Write' >> WriteToText(wordcount_options.output)
   )
 
@@ -611,8 +614,11 @@ def examples_wordcount_debugging(renames):
             [('Flourish', 3), ('stomach', 1)]))
     # [END example_wordcount_debugging_assert]
 
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     output = (filtered_words
-              | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], 
word_c1[1]))
+              | 'format' >> beam.Map(format_result)
               | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
 
     p.visit(SnippetUtils.RenameFiles(renames))
@@ -1119,6 +1125,9 @@ def model_group_by_key(contents, output_path):
 
   import apache_beam as beam
   with TestPipeline() as p:  # Use TestPipeline for testing.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     words_and_counts = (
         p
         | beam.Create(contents)
@@ -1133,7 +1142,7 @@ def model_group_by_key(contents, output_path):
     grouped_words = words_and_counts | beam.GroupByKey()
     # [END model_group_by_key_transform]
     (grouped_words
-     | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], 
sum(word_counts[1])))
+     | 'count words' >> beam.Map(count_ones)
      | beam.io.WriteToText(output_path))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 8a05991..d1af4ce 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -58,6 +58,9 @@ def run(argv=None):
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     transformed = (lines
                    # Use a pre-defined function that imports the re package.
                    | 'Split' >> (
@@ -65,7 +68,7 @@ def run(argv=None):
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(15, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1])))
+                   | 'Count' >> beam.Map(count_ones)
                    | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
     # Write to PubSub.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py 
b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 680314b..f88b942 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -69,13 +69,16 @@ def run(argv=None):
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     transformed = (lines
                    | 'Split' >> (beam.FlatMap(find_words)
                                  .with_output_types(unicode))
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(2*60, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1])))
+                   | 'Count' >> beam.Map(count_ones)
                    | 'Format' >> beam.ParDo(FormatDoFn()))
 
     # Write to BigQuery.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index e21e91d..60f1ffe 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -88,15 +88,21 @@ def run(argv=None):
   lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
+  def count_ones(word_ones):
+    return (word_ones[0], sum(word_ones[1]))
+
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], 
word_c[1]))
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
+  output = counts | 'format' >> beam.Map(format_result)
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py 
b/sdks/python/apache_beam/examples/wordcount_debugging.py
index bdc4c16..7d18c0d 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -93,12 +93,15 @@ class CountWords(beam.PTransform):
   PCollection of (word, count) tuples.
   """
   def expand(self, pcoll):
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
 
 def run(argv=None):
@@ -141,8 +144,11 @@ def run(argv=None):
     # Format the counts into a PCollection of strings and write the output 
using
     # a "Write" transform that has side effects.
     # pylint: disable=unused-variable
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     output = (filtered_words
-              | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], 
word_c[1]))
+              | 'format' >> beam.Map(format_result)
               | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py 
b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 01c3955..54fd1f1 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,7 +106,10 @@ def run(argv=None):
         | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
     # Format the counts into a PCollection of strings.
-    output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], 
w_c[1]))
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
+    output = counts | 'Format' >> beam.Map(format_result)
 
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py 
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index d4063df..5b580a6 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -449,7 +449,11 @@ class MergeAccumulators(beam.PTransform):
       return beam.pvalue.PCollection(input.pipeline)
     else:
       merge_accumulators = self.combine_fn.merge_accumulators
-      return input | beam.Map(lambda k_vs: (k_vs[0], 
merge_accumulators(k_vs[1])))
+
+      def merge_with_existing_key(k_vs):
+        return (k_vs[0], merge_accumulators(k_vs[1]))
+
+      return input | beam.Map(merge_with_existing_key)
 
 
 class ExtractOutputs(beam.PTransform):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index 5a56c7a..3afabaf 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -404,14 +404,20 @@ class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):
     with TestPipeline() as p:
+      def construct_timestamped(k_t):
+        return TimestampedValue((k_t[0], k_t[1]), k_t[1])
+
+      def format_result(k_v):
+        return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))
+
       result = (p
                 | beam.Create([1, 2, 3, 4, 5, 10, 11])
                 | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
-                | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), 
k_t[1]))
+                | beam.Map(construct_timestamped)
                 | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3),
                                   
accumulation_mode=AccumulationMode.DISCARDING)
                 | beam.GroupByKey()
-                | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), 
set(k_v[1]))))
+                | beam.Map(format_result))
       assert_that(result, equal_to(
           {
               'A-5': {1, 2, 3, 4, 5},

Reply via email to