[GitHub] incubator-beam pull request #453: Wait for Elements to be fetched in KafkaIO...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/453


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Wait for Elements to be fetched in KafkaIO#start

2016-06-21 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3ff98ea23 -> f4809446b


Wait for Elements to be fetched in KafkaIO#start

This makes it more likely that the reader has elements after the call to
start returns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15f69edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15f69edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15f69edf

Branch: refs/heads/master
Commit: 15f69edf80237152739a737e7e84c9ec933d372c
Parents: 3ff98ea
Author: Thomas Groh 
Authored: Mon Jun 13 15:32:19 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 23:28:02 2016 -0700

--
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15f69edf/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index d540a8d..3b64bd5 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -759,6 +759,8 @@ public class KafkaIO {
 private Iterator curBatch = Collections.emptyIterator();
 
 private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+// how long to wait for new records from kafka consumer inside start()
+private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = 
Duration.standardSeconds(5);
 // how long to wait for new records from kafka consumer inside advance()
 private static final Duration NEW_RECORDS_POLL_TIMEOUT = 
Duration.millis(10);
 
@@ -891,12 +893,12 @@ public class KafkaIO {
   LOG.info("{}: Returning from consumer pool loop", this);
 }
 
-private void nextBatch() {
+private void nextBatch(Duration timeout) {
   curBatch = Collections.emptyIterator();
 
   ConsumerRecords records;
   try {
-records = 
availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+records = availableRecordsQueue.poll(timeout.getMillis(),
  TimeUnit.MILLISECONDS);
   } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
@@ -965,6 +967,9 @@ public class KafkaIO {
 }
   }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
+  // Wait for longer than normal when fetching a batch to improve chances 
a record is available
+  // when start() returns.
+  nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
   return advance();
 }
 
@@ -1028,7 +1033,7 @@ public class KafkaIO {
   return true;
 
 } else { // -- (b)
-  nextBatch();
+  nextBatch(NEW_RECORDS_POLL_TIMEOUT);
 
   if (!curBatch.hasNext()) {
 return false;



[2/2] incubator-beam git commit: Closes #453

2016-06-21 Thread dhalperi
Closes #453


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f4809446
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f4809446
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f4809446

Branch: refs/heads/master
Commit: f4809446b931c02e1dc5da0d86f01faf00b53581
Parents: 3ff98ea 15f69ed
Author: Dan Halperin 
Authored: Tue Jun 21 23:28:03 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 23:28:03 2016 -0700

--
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--




[GitHub] incubator-beam pull request #507: [BEAM-360] Implements a framework for deve...

2016-06-21 Thread chamikaramj
Github user chamikaramj closed the pull request at:

https://github.com/apache/incubator-beam/pull/507


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-360) Add a framework for creating Python-SDK sources for new file types

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343782#comment-15343782
 ] 

ASF GitHub Bot commented on BEAM-360:
-

Github user chamikaramj closed the pull request at:

https://github.com/apache/incubator-beam/pull/507


> Add a framework for creating Python-SDK sources for new file types
> --
>
> Key: BEAM-360
> URL: https://issues.apache.org/jira/browse/BEAM-360
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py
>Reporter: Chamikara Jayalath
>Assignee: Chamikara Jayalath
>
> We already have a framework for creating new sources for Beam Python SDK - 
> https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/iobase.py#L326
> It would be great if we can add a framework on top of this that encapsulates 
> logic common to sources that are based on files. This framework can include 
> following features that are common to sources based on files.
> (1) glob expansion
> (2) support for new file-systems
> (3) dynamic work rebalancing based on byte offsets
> (4) support for reading compressed files.
> Java SDK has a similar framework and it's available at - 
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BEAM-367) GetFractionConsumed() inaccurate for non-uniform records

2016-06-21 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin closed BEAM-367.

Resolution: Fixed

> GetFractionConsumed() inaccurate for non-uniform records
> 
>
> Key: BEAM-367
> URL: https://issues.apache.org/jira/browse/BEAM-367
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Ian Zhou
>Assignee: Daniel Halperin
>Priority: Minor
>
> GetFractionConsumed() provides inaccurate progress updates for clustered 
> records. For example, for a range spanning [1, 10], a cluster of records 
> around 5 (e.g. 5.01 ..., 5.09) will be recorded as ~50% complete upon 
> reading the first record, and will remain at this percentage until the final 
> record has been read. Instead, the start of the range should be changed to 
> the first record seen (e.g. new range [5.01, 10]). The end of the range 
> can be changed over time through dynamic work rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-367) GetFractionConsumed() inaccurate for non-uniform records

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343729#comment-15343729
 ] 

ASF GitHub Bot commented on BEAM-367:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/515


> GetFractionConsumed() inaccurate for non-uniform records
> 
>
> Key: BEAM-367
> URL: https://issues.apache.org/jira/browse/BEAM-367
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Ian Zhou
>Assignee: Daniel Halperin
>Priority: Minor
>
> GetFractionConsumed() provides inaccurate progress updates for clustered 
> records. For example, for a range spanning [1, 10], a cluster of records 
> around 5 (e.g. 5.01 ..., 5.09) will be recorded as ~50% complete upon 
> reading the first record, and will remain at this percentage until the final 
> record has been read. Instead, the start of the range should be changed to 
> the first record seen (e.g. new range [5.01, 10]). The end of the range 
> can be changed over time through dynamic work rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[2/2] incubator-beam git commit: Update OffsetRangeTracker progress tracking and start offset

2016-06-21 Thread dhalperi
Update OffsetRangeTracker progress tracking and start offset


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee0a3bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee0a3bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee0a3bf2

Branch: refs/heads/master
Commit: ee0a3bf2acc0213e27d3b3d1353c27b977046577
Parents: 741ef26
Author: Ian Zhou 
Authored: Tue Jun 21 17:23:09 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 22:29:25 2016 -0700

--
 .../beam/sdk/io/range/OffsetRangeTracker.java   | 13 +--
 .../beam/sdk/io/OffsetBasedSourceTest.java  |  5 +-
 .../sdk/io/range/OffsetRangeTrackerTest.java| 91 +++-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java |  3 -
 4 files changed, 82 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 76790af..a8d00ee 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 public class OffsetRangeTracker implements RangeTracker {
   private static final Logger LOG = 
LoggerFactory.getLogger(OffsetRangeTracker.class);
 
-  private final long startOffset;
+  private long startOffset;
   private long stopOffset;
   private long lastRecordStart = -1L;
   private long offsetOfLastSplitPoint = -1L;
@@ -101,6 +101,9 @@ public class OffsetRangeTracker implements 
RangeTracker {
   lastRecordStart));
 }
 
+if (lastRecordStart == -1) {
+  startOffset = recordStart;
+}
 lastRecordStart = recordStart;
 
 if (isAtSplitPoint) {
@@ -165,7 +168,7 @@ public class OffsetRangeTracker implements 
RangeTracker {
   throw new IllegalArgumentException(
   "getPositionForFractionConsumed is not applicable to an unbounded 
range: " + this);
 }
-return (long) Math.ceil(startOffset + fraction * (stopOffset - 
startOffset));
+return (long) Math.floor(startOffset + fraction * (stopOffset - 
startOffset));
   }
 
   @Override
@@ -179,11 +182,11 @@ public class OffsetRangeTracker implements 
RangeTracker {
 } else if (lastRecordStart >= stopOffset) {
   return 1.0;
 } else {
-  // E.g., when reading [3, 6) and lastRecordStart is 4, that means we 
consumed 3,4 of 3,4,5
-  // which is (4 - 3 + 1) / (6 - 3) = 67%.
+  // E.g., when reading [3, 6) and lastRecordStart is 4, that means we 
consumed 3 of 3,4,5
+  // which is (4 - 3) / (6 - 3) = 33%.
   // Also, clamp to at most 1.0 because the last consumed position can 
extend past the
   // stop position.
-  return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / 
(stopOffset - startOffset));
+  return Math.min(1.0, 1.0 * (lastRecordStart - startOffset) / (stopOffset 
- startOffset));
 }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 66abd33..7009023 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -218,13 +218,14 @@ public class OffsetBasedSourceTest {
 
   assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
   assertTrue(reader.start());
-  do {
+  items.add(reader.getCurrent());
+  while (reader.advance()) {
 Double fraction = reader.getFractionConsumed();
 assertNotNull(fraction);
 assertTrue(fraction.toString(), fraction > 0.0);
 assertTrue(fraction.toString(), fraction <= 1.0);
 items.add(reader.getCurrent());
-  } while (reader.advance());
+  }
   assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
 
   assertEquals(20, items.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range

[GitHub] incubator-beam pull request #515: [BEAM-367] Modify offset range tracker to ...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/515


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Closes #515

2016-06-21 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master 741ef266f -> 3ff98ea23


Closes #515


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ff98ea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ff98ea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ff98ea2

Branch: refs/heads/master
Commit: 3ff98ea23c97bead2644d94d2d776e6cbdbc219a
Parents: 741ef26 ee0a3bf
Author: Dan Halperin 
Authored: Tue Jun 21 22:29:25 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 22:29:25 2016 -0700

--
 .../beam/sdk/io/range/OffsetRangeTracker.java   | 13 +--
 .../beam/sdk/io/OffsetBasedSourceTest.java  |  5 +-
 .../sdk/io/range/OffsetRangeTrackerTest.java| 91 +++-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java |  3 -
 4 files changed, 82 insertions(+), 30 deletions(-)
--




[1/2] incubator-beam git commit: Closes #507

2016-06-21 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 4840f5275 -> e3a43fb5c


Closes #507


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3a43fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3a43fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3a43fb5

Branch: refs/heads/python-sdk
Commit: e3a43fb5c0530fcfb0e06ef86b78fedd912bbc54
Parents: 4840f52 2ebd137
Author: Dan Halperin 
Authored: Tue Jun 21 22:08:47 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 22:08:47 2016 -0700

--
 sdks/python/apache_beam/io/avroio.py| 205 +
 sdks/python/apache_beam/io/avroio_test.py   | 147 +++
 sdks/python/apache_beam/io/filebasedsource.py   | 246 +++
 .../apache_beam/io/filebasedsource_test.py  | 416 +++
 sdks/python/apache_beam/io/iobase.py|  26 +-
 sdks/python/apache_beam/io/range_trackers.py|  14 +-
 sdks/python/apache_beam/io/sources_test.py  |  46 +-
 .../python/apache_beam/runners/direct_runner.py |   5 +-
 sdks/python/setup.py|   1 +
 9 files changed, 1094 insertions(+), 12 deletions(-)
--




[2/2] incubator-beam git commit: Implements a framework for developing sources for new file types.

2016-06-21 Thread dhalperi
Implements a framework for developing sources for new file types.

Module 'filebasedsource' provides a framework for  creating sources for new 
file types. This framework readily implements several features common to many 
sources based on files.

Additionally, module 'avroio' contains a new source, 'AvroSource', that is 
implemented using the framework described above. 'AvroSource' is a source for 
reading Avro files.

Adds many unit tests for 'filebasedsource' and 'avroio' modules.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ebd137b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ebd137b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ebd137b

Branch: refs/heads/python-sdk
Commit: 2ebd137b28acd6f5c8bfd6994973b2ebbde5c84f
Parents: 4840f52
Author: Chamikara Jayalath 
Authored: Mon Jun 20 18:09:50 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 22:08:47 2016 -0700

--
 sdks/python/apache_beam/io/avroio.py| 205 +
 sdks/python/apache_beam/io/avroio_test.py   | 147 +++
 sdks/python/apache_beam/io/filebasedsource.py   | 246 +++
 .../apache_beam/io/filebasedsource_test.py  | 416 +++
 sdks/python/apache_beam/io/iobase.py|  26 +-
 sdks/python/apache_beam/io/range_trackers.py|  14 +-
 sdks/python/apache_beam/io/sources_test.py  |  46 +-
 .../python/apache_beam/runners/direct_runner.py |   5 +-
 sdks/python/setup.py|   1 +
 9 files changed, 1094 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ebd137b/sdks/python/apache_beam/io/avroio.py
--
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
new file mode 100644
index 000..022a68d
--- /dev/null
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -0,0 +1,205 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Implements a source for reading Avro files."""
+
+import os
+import StringIO
+import zlib
+
+from apache_beam.io import filebasedsource
+from avro import datafile
+from avro import io as avro_io
+from avro import schema
+
+
+class AvroSource(filebasedsource.FileBasedSource):
+  """A source for reading Avro files.
+
+  ``AvroSource`` is implemented using the file-based source framework available
+  in module 'filebasedsource'. Hence please refer to module 'filebasedsource'
+  to fully understand how this source implements operations common to all
+  file-based sources such as file-pattern expansion and splitting into bundles
+  for parallel processing.
+
+  If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
+  files, a ``PCollection`` for the records in these Avro files can be created 
in
+  the following manner.
+
+p = df.Pipeline(argv=pipeline_args)
+records = p | df.io.Read('Read', avroio.AvroSource('/mypath/myavrofiles*'))
+
+  Each record of this ``PCollection`` will contain a Python dictionary that
+  complies with the schema contained in the Avro file that contains that
+  particular record.
+  Keys of each dictionary will contain the corresponding field names and will
+  be of type ``string``. Values of the dictionary will be of the type defined 
in
+  the corresponding Avro schema.
+
+  For example, if schema of the Avro file is following.
+  {"namespace": "example.avro","type": "record","name": "User","fields":
+  [{"name": "name", "type": "string"},
+   {"name": "favorite_number",  "type": ["int", "null"]},
+   {"name": "favorite_color", "type": ["string", "null"]}]}
+
+  Then records generated by ``AvroSource`` will be dictionaries of the 
following
+  form.
+  {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
+  """
+
+  def __init__(self, file_pattern, min_bundle_size=0):
+super(AvroSource, self).__init__(file_pattern, min_bundle_size)
+self._avro_schema = None
+self._codec = None
+self._sync_marker = None
+
+  class

[2/2] incubator-beam git commit: This closes #492

2016-06-21 Thread kenn
This closes #492


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/741ef266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/741ef266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/741ef266

Branch: refs/heads/master
Commit: 741ef266f7d892896faba9f8d13bcd8bb7a61ae3
Parents: e255cd6 916bf3a
Author: Kenneth Knowles 
Authored: Tue Jun 21 21:38:19 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 21:38:19 2016 -0700

--
 .../org/apache/beam/sdk/coders/SerializableCoderTest.java| 5 +
 .../test/java/org/apache/beam/sdk/transforms/CreateTest.java | 4 
 .../java/org/apache/beam/sdk/util/GatherAllPanesTest.java| 8 
 3 files changed, 17 insertions(+)
--




[GitHub] incubator-beam pull request #492: Run Pipelines in tests that expect them

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/492


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Run Pipelines in tests that expect them

2016-06-21 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master e255cd6be -> 741ef266f


Run Pipelines in tests that expect them


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/916bf3a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/916bf3a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/916bf3a8

Branch: refs/heads/master
Commit: 916bf3a84776c45f679bc27dfbaa6e7eec205ef7
Parents: 69a4141
Author: Thomas Groh 
Authored: Fri Jun 17 16:06:37 2016 -0700
Committer: Thomas Groh 
Committed: Fri Jun 17 16:07:39 2016 -0700

--
 .../org/apache/beam/sdk/coders/SerializableCoderTest.java| 5 +
 .../test/java/org/apache/beam/sdk/transforms/CreateTest.java | 4 
 .../java/org/apache/beam/sdk/util/GatherAllPanesTest.java| 8 
 3 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/916bf3a8/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 98c4d6f..f79f243 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -128,6 +130,7 @@ public class SerializableCoderTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testDefaultCoder() throws Exception {
 Pipeline p = TestPipeline.create();
 
@@ -141,6 +144,8 @@ public class SerializableCoderTest implements Serializable {
 
 PAssert.that(output)
 .containsInAnyOrder("Hello", "World");
+
+p.run();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/916bf3a8/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index e491fea..07ba002 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -151,6 +152,7 @@ public class CreateTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateParameterizedType() throws Exception {
 Pipeline p = TestPipeline.create();
 
@@ -163,6 +165,8 @@ public class CreateTest {
 .containsInAnyOrder(
 TimestampedValue.of("a", new Instant(0)),
 TimestampedValue.of("b", new Instant(0)));
+
+p.run();
   }
   /**
* An unserializable class to demonstrate encoding of elements.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/916bf3a8/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
index 553d589..e9be41e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;

[08/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
new file mode 100644
index 000..f104f6a
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Executes a trigger while managing persistence of information about which 
subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as 
the entire trigger.
+ *
+ * Specifically, the responsibilities are:
+ *
+ * 
+ *   Invoking the trigger's methods via its {@link ExecutableTrigger} 
wrapper by
+ *   constructing the appropriate trigger contexts.
+ *   Committing a record of which subtriggers are finished to persistent 
state.
+ *   Restoring the record of which subtriggers are finished from 
persistent state.
+ *   Clearing out the persisted finished set when a caller indicates
+ *   (via {#link #clearFinished}) that it is no longer needed.
+ * 
+ *
+ * These responsibilities are intertwined: trigger contexts include mutable 
information about
+ * which subtriggers are finished. This class provides the information when 
building the contexts
+ * and commits the information when the method of the {@link 
ExecutableTrigger} returns.
+ *
+ * @param  The kind of windows being processed.
+ */
+public class TriggerRunner {
+  @VisibleForTesting
+  static final StateTag> FINISHED_BITS_TAG =
+  StateTags.makeSystemTagInternal(StateTags.value("closed", 
BitSetCoder.of()));
+
+  private final ExecutableTrigger rootTrigger;
+  private final TriggerContextFactory contextFactory;
+
+  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory 
contextFactory) {
+Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
+this.rootTrigger = rootTrigger;
+this.contextFactory = contextFactory;
+  }
+
+  private FinishedTriggersBitSet readFinishedBits(ValueState state) {
+if (!isFinishedSetNeeded()) {
+  // If no trigger in the tree will ever have finished bits, then we don't 
need to read them.
+  // So that the code can be agnostic to that fact, we create a BitSet 
that is all 0 (not
+  // finished) for each trigger in the tree.
+  return 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+}
+
+BitSet bitSet = state.read();
+return bitSet == null
+? 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+: FinishedTriggersBitSet.fromBitSet(bitSet);
+  }
+
+
+  private void clearFinishedBits(ValueState state) {
+if (!isFinishedSetNeeded()) {
+  // Nothing to clear.
+  return;
+}
+state.clear();
+  }
+
+  /** Return true if the trigger is closed in the window corresponding to the 
specified state. */
+  public boolean isClosed(StateAccessor state) {
+return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+  }
+
+  public void prefetchForValue(W window, StateAccessor state) {
+if (isFinishedSetNeeded()) {
+  state.access(FINISHED_BITS_TAG).readLater();
+}
+rootTrigger.getSpec().p

[jira] [Commented] (BEAM-362) Move shared runner functionality out of SDK and into runners/core-java

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343207#comment-15343207
 ] 

ASF GitHub Bot commented on BEAM-362:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/510


> Move shared runner functionality out of SDK and into runners/core-java
> --
>
> Key: BEAM-362
> URL: https://issues.apache.org/jira/browse/BEAM-362
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[01/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master 73862b422 -> e255cd6be


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
deleted file mode 100644
index 9916c5c..000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Test utility that runs a {@link ReduceFn}, {@link WindowFn}, {@link 
Trigger} using in-memory stub
- * implementations to provide the {@link TimerInternals} and {@link 
WindowingInternals} needed to
- * run {@code Trigger}s and {@code ReduceFn}s.
- *
- * @param  The element types.
- * @param  The final type for elements in the window (for instance,
- * {@code Iterable})
- * @param  The type of windows being used.
- */
-public class ReduceFnTester {
-  private static final String KEY = "TEST_KEY";
-
-  private final TestInMemoryStateInternals stateInternals =
-  new TestInMemoryStateInternals<>(KEY);
-  private final TestTimerInternals timerInternal

[02/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
deleted file mode 100644
index b7ec540..000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ /dev/null
@@ -1,1448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import org.apache.beam.sdk.WindowMatchers;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of
- * {@link ReduceFnRunner} with enclosed {@link ReduceFn}, down to the 
installed {@link Trigger}
- * (sometimes mocked). They proceed by injecting elements and advancing 
watermark and
- * processing time, then verifying produced panes and counters.
- */
-@RunWith(JUnit4.class)
-public class ReduceFnRunnerTest

[04/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
deleted file mode 100644
index 864e8e7..000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ /dev/null
@@ -1,985 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.StateStyle;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} 
has partitioned the
- * {@link PCollection} by key.
- *
- * The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the 
execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
- *
- * 
- * Tracking the windows that are active (have buffered data) as elements 
arrive and
- * triggers are fired.
- * Holding the watermark based on the timestamps of elements in a pane and 
releasing it
- * when the trigger fires.
- * Calling the appropriate callbacks on {@link ReduceFn} based on trigger 
execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} 
for actions
- * such as output.
- * Scheduling garbage collection of state associated with a specific 
window, and making that
- * happen when the appropriate timer fires.
- * 
- *
- * @paramThe type of key being processed.
- * @param   The type of values associated with the key.
- * @param  The output type that will be produced for each key.
- * @paramThe type of windows this operates on.
- */
-public class ReduceFnRunner {
-
-  /**
-   * The {@link ReduceFnRunner} depends on most aspects of the {@link 
WindowingStrategy}.
-   *
-   * 
-   * It runs the trigger from the {@link WindowingStrategy}.
-   * It merges windows according to the {@link WindowingStrategy}.
-   * It chooses how to track active windows and clear out expired windows
-   * according to the {@link WindowingStrategy}, based on the allowed lateness 
and
-   * whether windows can merge.
-   * It decides whether to emit empty final panes according to whether the
-   * {@link WindowingStrategy} requires it.
-   * It uses discarding or accumulation mode according to the {@link 
WindowingStrategy}.
-   * 
-   */
-  private final WindowingSt

[10/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
Move some easy stuff into runners/core-java

This change moves a set of classes with no dependents,
leaving them in the same Java packages for now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0fef8e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0fef8e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0fef8e63

Branch: refs/heads/master
Commit: 0fef8e6349216374ef60ef1d3356bdbdcc6f32ee
Parents: efaad32
Author: Kenneth Knowles 
Authored: Mon Jun 20 11:54:20 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 14:09:49 2016 -0700

--
 runners/core-java/pom.xml   |   30 +
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 +
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 +
 .../beam/sdk/util/BatchTimerInternals.java  |  140 ++
 .../org/apache/beam/sdk/util/DoFnRunner.java|   62 +
 .../apache/beam/sdk/util/DoFnRunnerBase.java|  558 +++
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 ++
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 ++
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  212 +++
 .../sdk/util/LateDataDroppingDoFnRunner.java|  147 ++
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 ++
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 ++
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 ++
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 ++
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 ++
 .../apache/beam/sdk/util/ReduceFnRunner.java|  985 
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 +
 .../apache/beam/sdk/util/SystemReduceFn.java|  135 ++
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 +++
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 +++
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 ++
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 ++
 .../util/LateDataDroppingDoFnRunnerTest.java|  117 ++
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 +++
 .../beam/sdk/util/ReduceFnRunnerTest.java   | 1448 ++
 .../apache/beam/sdk/util/ReduceFnTester.java|  784 ++
 .../beam/sdk/util/SimpleDoFnRunnerTest.java |   86 ++
 .../beam/runners/direct/DirectGroupByKey.java   |2 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java |2 +-
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 -
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 -
 .../beam/sdk/util/BatchTimerInternals.java  |  140 --
 .../org/apache/beam/sdk/util/DoFnRunner.java|   62 -
 .../apache/beam/sdk/util/DoFnRunnerBase.java|  558 ---
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 --
 .../apache/beam/sdk/util/GatherAllPanes.java|1 -
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 -
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 --
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  247 ---
 .../sdk/util/LateDataDroppingDoFnRunner.java|  147 --
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 --
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 --
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 --
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 --
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 --
 .../apache/beam/sdk/util/ReduceFnRunner.java|  985 
 .../sdk/util/ReifyTimestampsAndWindows.java |   63 +
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 -
 .../apache/beam/sdk/util/SystemReduceFn.java|  135 --
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 ---
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 ---
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 --
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 --
 .../util/LateDataDroppingDoFnRunnerTest.java|  117 --
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 ---
 .../beam/sdk/util/ReduceFnRunnerTest.java   | 1448 --
 .../apache/beam/sdk/util/ReduceFnTester.java|  784 --
 .../beam/sdk/util/SimpleDoFnRunnerTest.java |   86 --
 62 files changed, 8143 insertions(+), 8086 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/pom.xml
--
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 8ede60b..1587a1a 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -197,7 +197,31 @@
 
 
 
+
+ 

[06/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
new file mode 100644
index 000..9916c5c
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * Test utility that runs a {@link ReduceFn}, {@link WindowFn}, {@link 
Trigger} using in-memory stub
+ * implementations to provide the {@link TimerInternals} and {@link 
WindowingInternals} needed to
+ * run {@code Trigger}s and {@code ReduceFn}s.
+ *
+ * @param  The element types.
+ * @param  The final type for elements in the window (for instance,
+ * {@code Iterable})
+ * @param  The type of windows being used.
+ */
+public class ReduceFnTester {
+  private static final String KEY = "TEST_KEY";
+
+  private final TestInMemoryStateInternals stateInternals =
+  new TestInMemoryStateInternals<>(KEY);
+  private final TestTimerInternals timerInternals = new TestTimerInternals();
+
+  private final WindowFn windowFn;
+  private fi

[09/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
new file mode 100644
index 000..b1442dd
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not 
ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class PushbackSideInputDoFnRunner implements 
DoFnRunner {
+  private final DoFnRunner underlying;
+  private final Collection> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set notReadyWindows;
+
+  public static  PushbackSideInputDoFnRunner 
create(
+  DoFnRunner underlying,
+  Collection> views,
+  ReadyCheckingSideInputReader sideInputReader) {
+return new PushbackSideInputDoFnRunner<>(underlying, views, 
sideInputReader);
+  }
+
+  private PushbackSideInputDoFnRunner(
+  DoFnRunner underlying,
+  Collection> views,
+  ReadyCheckingSideInputReader sideInputReader) {
+this.underlying = underlying;
+this.views = views;
+this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+notReadyWindows = new HashSet<>();
+underlying.startBundle();
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for 
the provided element
+   * for each window the element is in that is ready.
+   *
+   * @param elem the element to process in all ready windows
+   * @return each element that could not be processed because it requires a 
side input window
+   * that is not ready.
+   */
+  public Iterable> 
processElementInReadyWindows(WindowedValue elem) {
+if (views.isEmpty()) {
+  processElement(elem);
+  return Collections.emptyList();
+}
+ImmutableList.Builder> pushedBack = 
ImmutableList.builder();
+for (WindowedValue windowElem : elem.explodeWindows()) {
+  BoundedWindow mainInputWindow = 
Iterables.getOnlyElement(windowElem.getWindows());
+  boolean isReady = !notReadyWindows.contains(mainInputWindow);
+  for (PCollectionView view : views) {
+BoundedWindow sideInputWindow =
+view.getWindowingStrategyInternal()
+.getWindowFn()
+.getSideInputWindow(mainInputWindow);
+if (!sideInputReader.isReady(view, sideInputWindow)) {
+  isReady = false;
+  break;
+}
+  }
+  if (isReady) {
+processElement(windowElem);
+  } else {
+notReadyWindows.add(mainInputWindow);
+pushedBack.add(windowElem);
+  }
+}
+return pushedBack.build();
+  }
+
+  @Override
+  public void processElement(WindowedValue elem) {
+underlying.processElement(elem);
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#finishBundle()}.
+   */
+  @Override
+  public void finishBundle() {
+notReadyWindows = null;
+underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
new file mode 100644
index 000..c5ee1e1
--- /dev/null
+++ b/runners/core-java/src/ma

[07/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
new file mode 100644
index 000..b7ec540
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -0,0 +1,1448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import org.apache.beam.sdk.WindowMatchers;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.Context;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of
+ * {@link ReduceFnRunner} with enclosed {@link ReduceFn}, down to the 
installed {@link Trigger}
+ * (sometimes mocked). They proceed by injecting elements and advancing 
watermark and
+ * processing time, then verifying produced panes and counters.
+ */
+@RunWith(JUnit4.class)
+public class ReduceFnR

[12/12] incubator-beam git commit: This closes #510

2016-06-21 Thread kenn
This closes #510


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e255cd6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e255cd6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e255cd6b

Branch: refs/heads/master
Commit: e255cd6be4eef7072c05bdf8720a766a2a597ea5
Parents: 73862b4 0fef8e6
Author: Kenneth Knowles 
Authored: Tue Jun 21 19:21:53 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 19:21:53 2016 -0700

--
 runners/core-java/pom.xml   |   30 +
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 +
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 +
 .../beam/sdk/util/BatchTimerInternals.java  |  140 ++
 .../org/apache/beam/sdk/util/DoFnRunner.java|   62 +
 .../apache/beam/sdk/util/DoFnRunnerBase.java|  558 +++
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 ++
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 ++
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  212 +++
 .../sdk/util/LateDataDroppingDoFnRunner.java|  147 ++
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 ++
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 ++
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 ++
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 ++
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 ++
 .../apache/beam/sdk/util/ReduceFnRunner.java|  985 
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 +
 .../apache/beam/sdk/util/SystemReduceFn.java|  135 ++
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 +++
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 +++
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 ++
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 ++
 .../util/LateDataDroppingDoFnRunnerTest.java|  117 ++
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 +++
 .../beam/sdk/util/ReduceFnRunnerTest.java   | 1448 ++
 .../apache/beam/sdk/util/ReduceFnTester.java|  784 ++
 .../beam/sdk/util/SimpleDoFnRunnerTest.java |   86 ++
 .../beam/runners/direct/DirectGroupByKey.java   |2 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java |2 +-
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 -
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 -
 .../beam/sdk/util/BatchTimerInternals.java  |  140 --
 .../org/apache/beam/sdk/util/DoFnRunner.java|   62 -
 .../apache/beam/sdk/util/DoFnRunnerBase.java|  558 ---
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 --
 .../apache/beam/sdk/util/GatherAllPanes.java|1 -
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 -
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 --
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  247 ---
 .../sdk/util/LateDataDroppingDoFnRunner.java|  147 --
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 --
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 --
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 --
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 --
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 --
 .../apache/beam/sdk/util/ReduceFnRunner.java|  985 
 .../sdk/util/ReifyTimestampsAndWindows.java |   63 +
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 -
 .../apache/beam/sdk/util/SystemReduceFn.java|  135 --
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 ---
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 ---
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 --
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 --
 .../util/LateDataDroppingDoFnRunnerTest.java|  117 --
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 ---
 .../beam/sdk/util/ReduceFnRunnerTest.java   | 1448 --
 .../apache/beam/sdk/util/ReduceFnTester.java|  784 --
 .../beam/sdk/util/SimpleDoFnRunnerTest.java |   86 --
 62 files changed, 8143 insertions(+), 8086 deletions(-)
--




[05/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
deleted file mode 100644
index 65fc52d..000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * An implementation of {@link GroupByKey} built on top of a lower-level 
{@link GroupByKeyOnly}
- * primitive.
- *
- * This implementation of {@link GroupByKey} proceeds via the following 
steps:
- * 
- *   {@code ReifyTimestampsAndWindowsDoFn 
ParDo(ReifyTimestampsAndWindows)}: This embeds
- *   the previously-implicit timestamp and window into the elements 
themselves, so a
- *   window-and-timestamp-unaware transform can operate on them.
- *   {@code GroupByKeyOnly}: This lower-level primitive groups by keys, 
ignoring windows
- *   and timestamps. Many window-unaware runners have such a primitive 
already.
- *   {@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The 
values in the iterables
- *   output by {@link GroupByKeyOnly} are sorted by timestamp.
- *   {@code GroupAlsoByWindow}: This primitive processes the sorted 
values. Today it is
- *   implemented as a {@link ParDo} that calls reserved internal 
methods.
- * 
- *
- * This implementation of {@link GroupByKey} has severe limitations unless 
its component
- * transforms are replaced. As-is, it is only applicable for in-memory runners 
using a batch-style
- * execution strategy. Specifically:
- *
- * 
- *   Every iterable output by {@link GroupByKeyOnly} must contain all 
elements for that key.
- *   A streaming-style partition, with multiple elements for the same key, 
will not yield
- *   correct results.
- *   Sorting of values by timestamp is performed on an in-memory list. It 
will not succeed
- *   for large iterables.
- *   The implementation of {@code GroupAlsoByWindow} does not support 
timers. This is only
- *   appropriate for runners which also do not support timers.
- * 
- */
-public class GroupByKeyViaGroupByKeyOnly
-extends PTransform>, PCollection>>> 
{
-
-  private GroupByKey gbkTransform;
-
-  public GroupByKeyViaGroupByKeyOnly(GroupByKey originalTransform) {
-this.gbkTransform = originalTransform;
-  }
-
-  @Override
-  public PCollection>> apply(PCollection> input) {
-WindowingStrategy windowingStrategy = input.getWindowingStrategy();
-
-return input
-// Make each input element's timestamp and assigned windows
-// explicit, in the value part.
-.apply(new ReifyTimestampsAndWindows())
-
-// Group by just the key.
-// Combiner lifting will not happen regardless of the 
disallowCombinerLifting value.
-// There will be no combiners right after the GroupByKeyOnly because 
of the two ParDos
-// introduced in here.
-.apply(new GroupByKeyOnly>())
-
-// Sort each key's values by timestamp. GroupAlsoByWindow requires
-// its input to be sorted by timestamp.
-.apply(new 

[11/12] incubator-beam git commit: Directly instantiate SimpleDoFnRunner in its test suite

2016-06-21 Thread kenn
Directly instantiate SimpleDoFnRunner in its test suite

Previously, it went through a static utility class, creating
quasi-circular dependency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/efaad322
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/efaad322
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/efaad322

Branch: refs/heads/master
Commit: efaad322f1d3c044b7689764c0eba4f58ded718a
Parents: 3001804
Author: Kenneth Knowles 
Authored: Tue Jun 21 10:06:36 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 14:09:49 2016 -0700

--
 .../test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/efaad322/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
index 680422b..fb74fc6 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
@@ -66,7 +66,7 @@ public class SimpleDoFnRunnerTest {
 // Pass in only necessary parameters for the test
 List> sideOutputTags = Arrays.asList();
 StepContext context = mock(StepContext.class);
-return DoFnRunners.simpleRunner(
+return new SimpleDoFnRunner<>(
   null, fn, null, null, null, sideOutputTags, context, null, null);
   }
 



[03/12] incubator-beam git commit: Move some easy stuff into runners/core-java

2016-06-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
deleted file mode 100644
index 14ec082..000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-/**
- * Implements the logic to hold the output watermark for a computation back
- * until it has seen all the elements it needs based on the input watermark 
for the
- * computation.
- *
- * The backend ensures the output watermark can never progress beyond the
- * input watermark for a computation. GroupAlsoByWindows computations may add 
a 'hold'
- * to the output watermark in order to prevent it progressing beyond a time 
within a window.
- * The hold will be 'cleared' when the associated pane is emitted.
- *
- * This class is only intended for use by {@link ReduceFnRunner}. The two 
evolve together and
- * will likely break any other uses.
- *
- * @param  The kind of {@link BoundedWindow} the hold is for.
- */
-class WatermarkHold implements Serializable {
-  /**
-   * Return tag for state containing the output watermark hold
-   * used for elements.
-   */
-  public static 
-  StateTag> watermarkHoldTagForOutputTimeFn(
-  OutputTimeFn outputTimeFn) {
-return StateTags.>makeSystemTagInternal(
-StateTags.watermarkStateInternal("hold", outputTimeFn));
-  }
-
-  /**
-   * Tag for state containing end-of-window and garbage collection output 
watermark holds.
-   * (We can't piggy-back on the data hold state since the outputTimeFn may be
-   * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every 
pane will
-   * would take the end-of-window time as its element time.)
-   */
-  @VisibleForTesting
-  public static final StateTag> 
EXTRA_HOLD_TAG =
-  StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
-  "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
-
-  private final TimerInternals timerInternals;
-  private final WindowingStrategy windowingStrategy;
-  private final StateTag> elementHoldTag;
-
-  public WatermarkHold(TimerInternals timerInternals, WindowingStrategy 
windowingStrategy) {
-this.timerInternals = timerInternals;
-this.windowingStrategy = windowingStrategy;
-this.elementHoldTag = 
watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
-  }
-
-  /**
-   * Add a hold to prevent the output watermark progressing beyond the 
(possibly adjusted) timestamp
-   * of the element in {@code context}. We allow the actual hold time to be 
shifted later by
-   * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the 
window. The hold will
-   * remain until cleared by {@link #extractAndRelease}. Return the timestamp 
at which the hold
-   * was placed, or {@literal null} if no hold was placed.
-   *
-   * In the following we'll write {@code E} to represent an element's 
timestamp after passing
-   * through the window strategy's outp

[GitHub] incubator-beam pull request #510: [BEAM-362] Move some easy stuff into runne...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/510


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-155) Support asserting the contents of windows and panes in PAssert

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343172#comment-15343172
 ] 

ASF GitHub Bot commented on BEAM-155:
-

GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/518

[BEAM-155] Support Windowed Assertions in PAssert

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
This allows users to write assertions that a specific window contains 
contents,
rather than the entire PCollection. This improves the power of assertions 
and
allows per-window assertions to be written independently of each other, 
which
allows for tests that are easier to comprehend.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam multi_pane_passert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #518


commit 28338a6de04609b6dcee3b68abbbe0424552d5e1
Author: Thomas Groh 
Date:   2016-06-17T22:19:26Z

Key with integers in GatherAllPanes

Ensures that runners which do not support null values can handle
GatherAllPanes.

commit 3972260442f52b8d4d806426d3cbcfc2252b6c49
Author: Thomas Groh 
Date:   2016-06-20T21:38:11Z

Use GatherAllPanes in PAssert

Instead of explicitly grouping by key, gather all the panes across the
input window.

commit 6ec58a8a074f8b04bd64776c0eee4a5bd0d0c4ae
Author: Thomas Groh 
Date:   2016-06-20T21:39:54Z

Update test for GatherAllPanes

commit b1b14f73cd51b4b409ca22e46cc22b3d916c905c
Author: Thomas Groh 
Date:   2016-06-21T17:44:43Z

Add StaticWindows

This is a windowFn that ignores the input and always assigns to the same
input. It returns the provided window as the side input window if and
only if that window is present within its set of windows.




> Support asserting the contents of windows and panes in PAssert
> --
>
> Key: BEAM-155
> URL: https://issues.apache.org/jira/browse/BEAM-155
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>
> This consists of reifying the output windows and panes, and running asserts 
> per-window about the contents of panes.
> This includes aggregated matching and final pane matching, e.g.
> PAssert.that(output).byOnTimePane().hasOutputElements(foo, bar);
> // For discarding mode - could have emitted (say) [spam, eggs], [spam], [], 
> [sausage], []
> PAssert.that(output).byFinalPane().hasOutputElements(spam, eggs, sausage, 
> spam);
> // For accumulating mode without late data
> PAssert.that(output).finalPane().containsInAnyOrder(spam, eggs, sausage, 
> spam);
> // For accumulating mode with late data
> PAssert.that(output).finalPane().containsInAnyOrder(foo, 
> bar).mayAlsoContain(baz, rab);
> See also: 
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #518: [BEAM-155] Support Windowed Assertions in ...

2016-06-21 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/518

[BEAM-155] Support Windowed Assertions in PAssert

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
This allows users to write assertions that a specific window contains 
contents,
rather than the entire PCollection. This improves the power of assertions 
and
allows per-window assertions to be written independently of each other, 
which
allows for tests that are easier to comprehend.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam multi_pane_passert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #518


commit 28338a6de04609b6dcee3b68abbbe0424552d5e1
Author: Thomas Groh 
Date:   2016-06-17T22:19:26Z

Key with integers in GatherAllPanes

Ensures that runners which do not support null values can handle
GatherAllPanes.

commit 3972260442f52b8d4d806426d3cbcfc2252b6c49
Author: Thomas Groh 
Date:   2016-06-20T21:38:11Z

Use GatherAllPanes in PAssert

Instead of explicitly grouping by key, gather all the panes across the
input window.

commit 6ec58a8a074f8b04bd64776c0eee4a5bd0d0c4ae
Author: Thomas Groh 
Date:   2016-06-20T21:39:54Z

Update test for GatherAllPanes

commit b1b14f73cd51b4b409ca22e46cc22b3d916c905c
Author: Thomas Groh 
Date:   2016-06-21T17:44:43Z

Add StaticWindows

This is a windowFn that ignores the input and always assigns to the same
input. It returns the provided window as the side input window if and
only if that window is present within its set of windows.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-368) Add getSplitPointsConsumed() to ByteKeyRangeTracker

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343143#comment-15343143
 ] 

ASF GitHub Bot commented on BEAM-368:
-

GitHub user ianzhou1 opened a pull request:

https://github.com/apache/incubator-beam/pull/517

[BEAM-368] Added getSplitPointsConsumed() to ByteKeyRangeTracker

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ianzhou1/incubator-beam 
ByteKeyRangeTrackerAddSplitPointsConsumed

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #517






> Add getSplitPointsConsumed() to ByteKeyRangeTracker
> ---
>
> Key: BEAM-368
> URL: https://issues.apache.org/jira/browse/BEAM-368
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Ian Zhou
>Assignee: Daniel Halperin
>Priority: Minor
>
> ByteKeyRangeTracker should keep track of the number of split points seen and 
> the number of split points consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-368) Add getSplitPointsConsumed() to ByteKeyRangeTracker

2016-06-21 Thread Ian Zhou (JIRA)
Ian Zhou created BEAM-368:
-

 Summary: Add getSplitPointsConsumed() to ByteKeyRangeTracker
 Key: BEAM-368
 URL: https://issues.apache.org/jira/browse/BEAM-368
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-gcp
Reporter: Ian Zhou
Assignee: Daniel Halperin
Priority: Minor


ByteKeyRangeTracker should keep track of the number of split points seen and 
the number of split points consumed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #517: [BEAM-368] Added getSplitPointsConsumed() ...

2016-06-21 Thread ianzhou1
GitHub user ianzhou1 opened a pull request:

https://github.com/apache/incubator-beam/pull/517

[BEAM-368] Added getSplitPointsConsumed() to ByteKeyRangeTracker

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ianzhou1/incubator-beam 
ByteKeyRangeTrackerAddSplitPointsConsumed

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #517






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #516: Enable dependency-plugin at global level.

2016-06-21 Thread peihe
GitHub user peihe opened a pull request:

https://github.com/apache/incubator-beam/pull/516

Enable dependency-plugin at global level.

Create the PR to trigger tests.
It is not ready for review yet, since it will likely break the build.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/peihe/incubator-beam fix-spark-dep

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/516.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #516


commit 3809d314f1c06094070d071c836c8fec09334963
Author: Pei He 
Date:   2016-06-21T22:25:06Z

Enable dependency-plugin at global level.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-91) Retractions

2016-06-21 Thread Matt Pouttu-Clarke (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343104#comment-15343104
 ] 

Matt Pouttu-Clarke commented on BEAM-91:


https://github.com/LamdaFu/bloklinx/wiki
^^ provides the basic description
https://github.com/LamdaFu/bloklinx/wiki/Bloklinx-Schema-(flatbuffers)
^^ provides definition of the basic mapping to flatbuffer serialization

I am working on examples of basic versioning as well as branching / merge / 
change data processing

The question I get asked most is why an "UPDATE" is composed of a redaction 
followed by an assertion rather than just one record.  The answer is that this 
provides several huge benefits including very efficient refresh of downstream 
aggregations, splits, merges, easier data diff, and much easier reconciliation. 
 This will become clear with my subsequent examples to come.

> Retractions
> ---
>
> Key: BEAM-91
> URL: https://issues.apache.org/jira/browse/BEAM-91
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Tyler Akidau
>Assignee: Frances Perry
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> We still haven't added retractions to Beam, even though they're a core part 
> of the model. We should document all the necessary aspects (uncombine, 
> reverting DoFn output with DoOvers, sink integration, source-level 
> retractions, etc), and then implement them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-367) GetFractionConsumed() inaccurate for non-uniform records

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343099#comment-15343099
 ] 

ASF GitHub Bot commented on BEAM-367:
-

GitHub user ianzhou1 opened a pull request:

https://github.com/apache/incubator-beam/pull/515

[BEAM-367] Modify offset range tracker to use first response as start offset

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ianzhou1/incubator-beam 
OffsetRangeTrackerUpdateStartOffset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/515.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #515


commit d9f11ad88d428feefba50faa8f64b1912da97362
Author: Ian Zhou 
Date:   2016-06-22T00:23:09Z

Modify offset range tracker to use first response as start offset




> GetFractionConsumed() inaccurate for non-uniform records
> 
>
> Key: BEAM-367
> URL: https://issues.apache.org/jira/browse/BEAM-367
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Ian Zhou
>Assignee: Daniel Halperin
>Priority: Minor
>
> GetFractionConsumed() provides inaccurate progress updates for clustered 
> records. For example, for a range spanning [1, 10], a cluster of records 
> around 5 (e.g. 5.01 ..., 5.09) will be recorded as ~50% complete upon 
> reading the first record, and will remain at this percentage until the final 
> record has been read. Instead, the start of the range should be changed to 
> the first record seen (e.g. new range [5.01, 10]). The end of the range 
> can be changed over time through dynamic work rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-367) GetFractionConsumed() inaccurate for non-uniform records

2016-06-21 Thread Ian Zhou (JIRA)
Ian Zhou created BEAM-367:
-

 Summary: GetFractionConsumed() inaccurate for non-uniform records
 Key: BEAM-367
 URL: https://issues.apache.org/jira/browse/BEAM-367
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-gcp
Reporter: Ian Zhou
Assignee: Daniel Halperin
Priority: Minor


GetFractionConsumed() provides inaccurate progress updates for clustered 
records. For example, for a range spanning [1, 10], a cluster of records around 
5 (e.g. 5.01 ..., 5.09) will be recorded as ~50% complete upon reading 
the first record, and will remain at this percentage until the final record has 
been read. Instead, the start of the range should be changed to the first 
record seen (e.g. new range [5.01, 10]). The end of the range can be 
changed over time through dynamic work rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #515: [BEAM-367] Modify offset range tracker to ...

2016-06-21 Thread ianzhou1
GitHub user ianzhou1 opened a pull request:

https://github.com/apache/incubator-beam/pull/515

[BEAM-367] Modify offset range tracker to use first response as start offset

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ianzhou1/incubator-beam 
OffsetRangeTrackerUpdateStartOffset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/515.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #515


commit d9f11ad88d428feefba50faa8f64b1912da97362
Author: Ian Zhou 
Date:   2016-06-22T00:23:09Z

Modify offset range tracker to use first response as start offset




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #514: Add Travis config for Python SDK tests

2016-06-21 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/incubator-beam/pull/514

Add Travis config for Python SDK tests

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
Adds a basic Travis config for installing and running tests with tox. tox 
is only configured to setup.py test. This will provide a basic level of 
presubmit coverage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aaltay/incubator-beam travis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #514


commit f9cf5e9313dc5051575a0596a6fac30740358812
Author: Ahmet Altay 
Date:   2016-06-21T21:18:24Z

Travis config for python tests

commit 930690afec297fd3187d6a79e1b008392eedaefb
Author: Ahmet Altay 
Date:   2016-06-21T21:39:53Z

update tox path for osx

commit 83f6568fa77d90e4e5147954161c297fd3454a88
Author: Ahmet Altay 
Date:   2016-06-21T23:19:53Z

add back maven override for osx

commit e11d28f6ad105bd9e4ab5bade3de9c5b99fdea45
Author: Ahmet Altay 
Date:   2016-06-21T23:47:55Z

add license to tox.ini




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #512: Use item equality in apply_to_list test

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/512


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Use item equality in apply_to_list test

2016-06-21 Thread bchambers
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk f166b16b8 -> 4840f5275


Use item equality in apply_to_list test

Orders of PCollections are not guaranteed and this test depends on
the current DirectRunner implementation for output to have the same
order as input.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0608ad94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0608ad94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0608ad94

Branch: refs/heads/python-sdk
Commit: 0608ad94758afe8caebb5f1d8472f89718668542
Parents: f166b16
Author: Ahmet Altay 
Authored: Tue Jun 21 11:05:39 2016 -0700
Committer: Ahmet Altay 
Committed: Tue Jun 21 11:05:39 2016 -0700

--
 sdks/python/apache_beam/transforms/ptransform_test.py | 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0608ad94/sdks/python/apache_beam/transforms/ptransform_test.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 401bcb4..3244473 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -510,14 +510,15 @@ class PTransformTest(unittest.TestCase):
 pipeline.run()
 
   def test_apply_to_list(self):
-self.assertEqual([1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 
1))
-self.assertEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2))
-self.assertEqual([1, 2, 3, 100],
- ([1, 2, 3], [100]) | beam.Flatten('flat'))
+self.assertItemsEqual(
+[1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 1))
+self.assertItemsEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2))
+self.assertItemsEqual([1, 2, 100, 3],
+  ([1, 2, 3], [100]) | beam.Flatten('flat'))
 join_input = ([('k', 'a')],
   [('k', 'b'), ('k', 'c')])
-self.assertEqual([('k', (['a'], ['b', 'c']))],
- join_input | beam.CoGroupByKey('join'))
+self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
+  join_input | beam.CoGroupByKey('join'))
 
   def test_multi_input_ptransform(self):
 class DisjointUnion(PTransform):



[2/2] incubator-beam git commit: This closes #512

2016-06-21 Thread bchambers
This closes #512


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4840f527
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4840f527
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4840f527

Branch: refs/heads/python-sdk
Commit: 4840f5275c569c190088179cb5dee6380ae641c9
Parents: f166b16 0608ad9
Author: bchambers 
Authored: Tue Jun 21 15:20:27 2016 -0700
Committer: bchambers 
Committed: Tue Jun 21 15:20:27 2016 -0700

--
 sdks/python/apache_beam/transforms/ptransform_test.py | 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--




[jira] [Commented] (BEAM-359) AvroCoder should be able to handle anonymous classes as schemas

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342676#comment-15342676
 ] 

ASF GitHub Bot commented on BEAM-359:
-

GitHub user bjchambers opened a pull request:

https://github.com/apache/incubator-beam/pull/513

Fix BEAM-359

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [*] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [*] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [*] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [*] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Rather than throwing an exception when checking determinism of an erased
generic, record that as a non-determistic class and proceed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bjchambers/incubator-beam beam-359-avrocode.r

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #513


commit 0e656f4df4371fdf6da181ddb984027bce7ec72d
Author: bchambers 
Date:   2016-06-21T20:42:33Z

Fix BEAM-359

Rather than throwing an exception when checking determinism of an erased
generic, record that as a non-determistic class and proceed.




> AvroCoder should be able to handle anonymous classes as schemas
> ---
>
> Key: BEAM-359
> URL: https://issues.apache.org/jira/browse/BEAM-359
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 0.1.0-incubating, 0.2.0-incubating
>Reporter: Daniel Mills
>Assignee: Ben Chambers
>
> Currently, the determinism checker NPEs with:
> java.lang.IllegalArgumentException: Unable to get field id from class null
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.getField(AvroCoder.java:710)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:548)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:567)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:430)
> at 
> com.google.cloud.dataflow.sdk.coders.AvroCoder.(AvroCoder.java:189)
> at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:144)
> at mypackage.GenericsTest$1.create(GenericsTest.java:102)
> at 
> com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoderFromFactory(CoderRegistry.java:797)
> at 
> com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:748)
> at 
> com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:719)
> at 
> com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:696)
> at 
> com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:178)
> at 
> com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:147)
> at 
> com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #513: Fix BEAM-359

2016-06-21 Thread bjchambers
GitHub user bjchambers opened a pull request:

https://github.com/apache/incubator-beam/pull/513

Fix BEAM-359

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [*] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [*] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [*] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [*] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Rather than throwing an exception when checking determinism of an erased
generic, record that as a non-determistic class and proceed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bjchambers/incubator-beam beam-359-avrocode.r

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/513.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #513


commit 0e656f4df4371fdf6da181ddb984027bce7ec72d
Author: bchambers 
Date:   2016-06-21T20:42:33Z

Fix BEAM-359

Rather than throwing an exception when checking determinism of an erased
generic, record that as a non-determistic class and proceed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to stable : beam_PostCommit_RunnableOnService_GoogleCloudDataflow #587

2016-06-21 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_RunnableOnService_GoogleCloudDataflow #586

2016-06-21 Thread Apache Jenkins Server
See 




[3/3] incubator-beam git commit: This closes #509

2016-06-21 Thread kenn
This closes #509


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73862b42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73862b42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73862b42

Branch: refs/heads/master
Commit: 73862b422130b35dfc5f012dbb55e1a9a62be55c
Parents: 3001804 b52688f
Author: Kenneth Knowles 
Authored: Tue Jun 21 11:54:40 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 11:54:40 2016 -0700

--
 runners/google-cloud-dataflow-java/pom.xml  |  1 +
 .../util/GroupAlsoByWindowViaWindowSetDoFn.java | 40 
 2 files changed, 1 insertion(+), 40 deletions(-)
--




[GitHub] incubator-beam pull request #509: Set runtime scope for Dataflow runner dep ...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam git commit: Remove temporary GroupAlsoByWindowViaWindowSetDoFn shim

2016-06-21 Thread kenn
Remove temporary GroupAlsoByWindowViaWindowSetDoFn shim


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2d563795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2d563795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2d563795

Branch: refs/heads/master
Commit: 2d563795b9ec3ec70f53980afa5bb2a50fd24518
Parents: 2b9906e
Author: Kenneth Knowles 
Authored: Tue Jun 21 10:21:34 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 10:21:53 2016 -0700

--
 .../util/GroupAlsoByWindowViaWindowSetDoFn.java | 40 
 1 file changed, 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2d563795/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
deleted file mode 100644
index 42ebf94..000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Stub class that exists for compatibility with code expecting
- * {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn} in 
the old location.
- */
-public class GroupAlsoByWindowViaWindowSetDoFn {
-
-  public static 
-  DoFn, KV> create(
-  WindowingStrategy strategy, SystemReduceFn reduceFn) {
-return 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn.create(
-strategy, reduceFn);
-  }
-
-  /** Prohibit instantiation. */
-  private GroupAlsoByWindowViaWindowSetDoFn() {}
-}
-



[1/3] incubator-beam git commit: Set runtime scope for Dataflow runner dep on runners-core

2016-06-21 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3001804e3 -> 73862b422


Set runtime scope for Dataflow runner dep on runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b52688f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b52688f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b52688f4

Branch: refs/heads/master
Commit: b52688f4e41e0130ac5947e7771ad8606aa9913c
Parents: 2d56379
Author: Kenneth Knowles 
Authored: Mon Jun 20 15:05:59 2016 -0700
Committer: Kenneth Knowles 
Committed: Tue Jun 21 10:21:53 2016 -0700

--
 runners/google-cloud-dataflow-java/pom.xml | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b52688f4/runners/google-cloud-dataflow-java/pom.xml
--
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index 6d8e94b..999e16d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -299,6 +299,7 @@
 
   org.apache.beam
   beam-runners-core-java
+  runtime
 
 
 



[jira] [Resolved] (BEAM-234) Remove the word Pipeline from the name of all PipelineRunner implementations

2016-06-21 Thread Thomas Groh (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Groh resolved BEAM-234.
--
Resolution: Fixed

> Remove the word Pipeline from the name of all PipelineRunner implementations
> 
>
> Key: BEAM-234
> URL: https://issues.apache.org/jira/browse/BEAM-234
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow, runner-direct, runner-flink, 
> runner-spark
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>
> The fact that a PipelineRunner runs a Pipeline is provided by its 
> implementation of the PipelineRunner abstract class, so all the inclusion of 
> "Pipeline" is makes it inconvenient to type.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-234) Remove the word Pipeline from the name of all PipelineRunner implementations

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342364#comment-15342364
 ] 

ASF GitHub Bot commented on BEAM-234:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/488


> Remove the word Pipeline from the name of all PipelineRunner implementations
> 
>
> Key: BEAM-234
> URL: https://issues.apache.org/jira/browse/BEAM-234
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow, runner-direct, runner-flink, 
> runner-spark
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>
> The fact that a PipelineRunner runs a Pipeline is provided by its 
> implementation of the PipelineRunner abstract class, so all the inclusion of 
> "Pipeline" is makes it inconvenient to type.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[2/2] incubator-beam git commit: Closes #488

2016-06-21 Thread dhalperi
Closes #488


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3001804e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3001804e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3001804e

Branch: refs/heads/master
Commit: 3001804e3cb33d2d399f64e3c88308e136321782
Parents: 2b9906e 96ffc42
Author: Dan Halperin 
Authored: Tue Jun 21 11:14:58 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 11:14:58 2016 -0700

--
 README.md   |   2 +-
 runners/spark/README.md |   8 +-
 runners/spark/pom.xml   |   2 +-
 .../beam/runners/spark/SparkPipelineRunner.java | 255 ---
 .../apache/beam/runners/spark/SparkRunner.java  | 255 +++
 .../runners/spark/SparkRunnerRegistrar.java |   8 +-
 .../runners/spark/TestSparkPipelineRunner.java  |  77 --
 .../beam/runners/spark/TestSparkRunner.java |  77 ++
 .../translation/SparkPipelineEvaluator.java |   6 +-
 .../StreamingWindowPipelineDetector.java|   6 +-
 .../apache/beam/runners/spark/DeDupTest.java|   4 +-
 .../beam/runners/spark/EmptyInputTest.java  |   4 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +-
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../apache/beam/runners/spark/TfIdfTest.java|   4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java|   6 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   6 +-
 .../spark/translation/CombinePerKeyTest.java|   4 +-
 .../spark/translation/DoFnOutputTest.java   |   6 +-
 .../translation/MultiOutputWordCountTest.java   |   4 +-
 .../spark/translation/SerializationTest.java|   6 +-
 .../spark/translation/SideEffectsTest.java  |   4 +-
 .../translation/TransformTranslatorTest.java|   6 +-
 .../translation/WindowedWordCountTest.java  |   8 +-
 .../streaming/FlattenStreamingTest.java |   6 +-
 .../streaming/KafkaStreamingTest.java   |   6 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 29 files changed, 397 insertions(+), 397 deletions(-)
--




[GitHub] incubator-beam pull request #488: [BEAM-234] Rename SparkPipelineRunner to S...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/488


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Rename SparkPipelineRunner to SparkRunner

2016-06-21 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master 2b9906e8d -> 3001804e3


Rename SparkPipelineRunner to SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96ffc429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96ffc429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96ffc429

Branch: refs/heads/master
Commit: 96ffc42972010c9b027e826d6d610555ab0c055a
Parents: 2b9906e
Author: Thomas Groh 
Authored: Fri Jun 17 10:45:09 2016 -0700
Committer: Dan Halperin 
Committed: Tue Jun 21 11:14:57 2016 -0700

--
 README.md   |   2 +-
 runners/spark/README.md |   8 +-
 runners/spark/pom.xml   |   2 +-
 .../beam/runners/spark/SparkPipelineRunner.java | 255 ---
 .../apache/beam/runners/spark/SparkRunner.java  | 255 +++
 .../runners/spark/SparkRunnerRegistrar.java |   8 +-
 .../runners/spark/TestSparkPipelineRunner.java  |  77 --
 .../beam/runners/spark/TestSparkRunner.java |  77 ++
 .../translation/SparkPipelineEvaluator.java |   6 +-
 .../StreamingWindowPipelineDetector.java|   6 +-
 .../apache/beam/runners/spark/DeDupTest.java|   4 +-
 .../beam/runners/spark/EmptyInputTest.java  |   4 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +-
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../apache/beam/runners/spark/TfIdfTest.java|   4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java|   6 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   6 +-
 .../spark/translation/CombinePerKeyTest.java|   4 +-
 .../spark/translation/DoFnOutputTest.java   |   6 +-
 .../translation/MultiOutputWordCountTest.java   |   4 +-
 .../spark/translation/SerializationTest.java|   6 +-
 .../spark/translation/SideEffectsTest.java  |   4 +-
 .../translation/TransformTranslatorTest.java|   6 +-
 .../translation/WindowedWordCountTest.java  |   8 +-
 .../streaming/FlattenStreamingTest.java |   6 +-
 .../streaming/KafkaStreamingTest.java   |   6 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 29 files changed, 397 insertions(+), 397 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/README.md
--
diff --git a/README.md b/README.md
index c4a9155..ec89c4d 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Beam supports executing programs on multiple distributed 
processing backends thr
 - The `DirectRunner` runs the pipeline on your local machine.
 - The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud 
Dataflow](http://cloud.google.com/dataflow/).
 - The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has 
been donated from 
[dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) 
and is now part of Beam.
-- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. The 
code has been donated from 
[cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is 
now part of Beam.
+- The `SparkRunner` runs the pipeline on an Apache Spark cluster. The code has 
been donated from 
[cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is 
now part of Beam.
 
 Have ideas for new Runners? See the 
[Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/README.md
--
diff --git a/runners/spark/README.md b/runners/spark/README.md
index 457f0a9..d2bfd3e 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -89,7 +89,7 @@ If we wanted to run a Beam pipeline with the default options 
of a single threade
 instance in local mode, we would do the following:
 
 Pipeline p = 
-EvaluationResult result = SparkPipelineRunner.create().run(p);
+EvaluationResult result = SparkRunner.create().run(p);
 
 To create a pipeline runner to run against a different Spark cluster, with a 
custom master url we
 would do the following:
@@ -97,7 +97,7 @@ would do the following:
 Pipeline p = 
 SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
 options.setSparkMaster("spark://host:port");
-EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+EvaluationResult result = SparkRunner.create(options).run(p);
 
 ## Word Count Example
 
@@ -113,7 +113,7 @@ Then run the [word count example][wc] from the SDK using a

[GitHub] incubator-beam pull request #512: Use item equality in apply_to_list test

2016-06-21 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/incubator-beam/pull/512

Use item equality in apply_to_list test

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Orders of PCollections are not guaranteed and this test depends on
the current DirectRunner implementation for output to have the same
order as input.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aaltay/incubator-beam pttest2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #512


commit 0608ad94758afe8caebb5f1d8472f89718668542
Author: Ahmet Altay 
Date:   2016-06-21T18:05:39Z

Use item equality in apply_to_list test

Orders of PCollections are not guaranteed and this test depends on
the current DirectRunner implementation for output to have the same
order as input.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #494: Use item equality in apply_to_list test

2016-06-21 Thread aaltay
Github user aaltay closed the pull request at:

https://github.com/apache/incubator-beam/pull/494


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #511: Fix a typo in FlinkRunner.toString

2016-06-21 Thread peihe
GitHub user peihe opened a pull request:

https://github.com/apache/incubator-beam/pull/511

Fix a typo in FlinkRunner.toString

R: @tgroh @kennknowles 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/peihe/incubator-beam patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/511.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #511


commit a43aa7afc28c6f4e4daf014025cb6bc960badc99
Author: peihe 
Date:   2016-06-21T18:05:46Z

Fix a typo in FlinkRunner.toString




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-91) Retractions

2016-06-21 Thread Matt Pouttu-Clarke (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342259#comment-15342259
 ] 

Matt Pouttu-Clarke commented on BEAM-91:


Yes, just wanted to be sure there was interest before documenting it.  FYI: it 
requires some form of distributed configuration service such as etcd or 
zookeeper to keep track of in-process change sessions.  Once the change 
sessions are done or "committed" (or time out), they are cleared from the 
config service but can be obtained from logs for later replays.  Also, in terms 
of granularity of change sessions, a large number of change sessions making 
very small changes can cause problems for the design and should be throttled at 
the client side.  I'll post a link here to the doco

> Retractions
> ---
>
> Key: BEAM-91
> URL: https://issues.apache.org/jira/browse/BEAM-91
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Tyler Akidau
>Assignee: Frances Perry
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> We still haven't added retractions to Beam, even though they're a core part 
> of the model. We should document all the necessary aspects (uncombine, 
> reverting DoFn output with DoOvers, sink integration, source-level 
> retractions, etc), and then implement them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-362) Move shared runner functionality out of SDK and into runners/core-java

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342241#comment-15342241
 ] 

ASF GitHub Bot commented on BEAM-362:
-

GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/510

[BEAM-362] Move some easy stuff into runners/core-java

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This change moves a set of classes with no dependents, leaving them in the 
same Java packages for now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam runners-core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/510.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #510


commit 77e397610beba6f8cfdaf7c60edaf5f4038dcf29
Author: Kenneth Knowles 
Date:   2016-06-20T18:54:20Z

Move some easy stuff into runners/core-java

This change moves a set of classes with no dependents,
leaving them in the same Java packages for now.

commit cf032738a101db7f6e0dbe1104114565a70fa7d0
Author: Kenneth Knowles 
Date:   2016-06-21T17:06:36Z

Directly instantiate SimpleDoFnRunner in its test suite

Previously, it went through a static utility class, creating
quasi-circular dependency.




> Move shared runner functionality out of SDK and into runners/core-java
> --
>
> Key: BEAM-362
> URL: https://issues.apache.org/jira/browse/BEAM-362
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #510: [BEAM-362] Move some easy stuff into runne...

2016-06-21 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/510

[BEAM-362] Move some easy stuff into runners/core-java

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This change moves a set of classes with no dependents, leaving them in the 
same Java packages for now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam runners-core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/510.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #510


commit 77e397610beba6f8cfdaf7c60edaf5f4038dcf29
Author: Kenneth Knowles 
Date:   2016-06-20T18:54:20Z

Move some easy stuff into runners/core-java

This change moves a set of classes with no dependents,
leaving them in the same Java packages for now.

commit cf032738a101db7f6e0dbe1104114565a70fa7d0
Author: Kenneth Knowles 
Date:   2016-06-21T17:06:36Z

Directly instantiate SimpleDoFnRunner in its test suite

Previously, it went through a static utility class, creating
quasi-circular dependency.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-366) Support Display Data on Composite Transforms

2016-06-21 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342229#comment-15342229
 ] 

Ben Chambers commented on BEAM-366:
---

As mentioned, representing composites explicitly in the Dataflow runner is 
currently dependent on the improved Runner API.

> Support Display Data on Composite Transforms
> 
>
> Key: BEAM-366
> URL: https://issues.apache.org/jira/browse/BEAM-366
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ben Chambers
>
> Today, Dataflow doesn't represent composites all the way to the UI (it 
> reconstructs them from the name). This means it doesn't support attaching 
> Display Data to composites.
> With the runner API refactoring, Dataflow should start supporting composites, 
> at which point we should make sure that Display Data is plumbed through 
> properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-366) Support Display Data on Composite Transforms

2016-06-21 Thread Davor Bonaci (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Bonaci updated BEAM-366:
--
Assignee: (was: Davor Bonaci)

> Support Display Data on Composite Transforms
> 
>
> Key: BEAM-366
> URL: https://issues.apache.org/jira/browse/BEAM-366
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ben Chambers
>
> Today, Dataflow doesn't represent composites all the way to the UI (it 
> reconstructs them from the name). This means it doesn't support attaching 
> Display Data to composites.
> With the runner API refactoring, Dataflow should start supporting composites, 
> at which point we should make sure that Display Data is plumbed through 
> properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-366) Support Display Data on Composite Transforms

2016-06-21 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-366:
-

 Summary: Support Display Data on Composite Transforms
 Key: BEAM-366
 URL: https://issues.apache.org/jira/browse/BEAM-366
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Ben Chambers
Assignee: Davor Bonaci


Today, Dataflow doesn't represent composites all the way to the UI (it 
reconstructs them from the name). This means it doesn't support attaching 
Display Data to composites.

With the runner API refactoring, Dataflow should start supporting composites, 
at which point we should make sure that Display Data is plumbed through 
properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #509: Set runtime scope for Dataflow runner dep ...

2016-06-21 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/509

Set runtime scope for Dataflow runner dep on runners-core

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam 
dataflow-runners-core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #509


commit 828511c86ac411aa9c86fda8b181a4ba298c2744
Author: Kenneth Knowles 
Date:   2016-06-20T22:05:59Z

Set runtime scope for Dataflow runner dep on runners-core




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-91) Retractions

2016-06-21 Thread Luke Cwik (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342094#comment-15342094
 ] 

Luke Cwik commented on BEAM-91:
---

Matt, do you have something you can share that goes into more detail?

> Retractions
> ---
>
> Key: BEAM-91
> URL: https://issues.apache.org/jira/browse/BEAM-91
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Tyler Akidau
>Assignee: Frances Perry
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> We still haven't added retractions to Beam, even though they're a core part 
> of the model. We should document all the necessary aspects (uncombine, 
> reverting DoFn output with DoOvers, sink integration, source-level 
> retractions, etc), and then implement them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-91) Retractions

2016-06-21 Thread Matt Pouttu-Clarke (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342022#comment-15342022
 ] 

Matt Pouttu-Clarke commented on BEAM-91:


The design actually solves multiple problems: i.e. BEAM-25, BEAM-91, and 
BEAM-101 share a common solution.

> Retractions
> ---
>
> Key: BEAM-91
> URL: https://issues.apache.org/jira/browse/BEAM-91
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Tyler Akidau
>Assignee: Frances Perry
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> We still haven't added retractions to Beam, even though they're a core part 
> of the model. We should document all the necessary aspects (uncombine, 
> reverting DoFn output with DoOvers, sink integration, source-level 
> retractions, etc), and then implement them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-91) Retractions

2016-06-21 Thread Matt Pouttu-Clarke (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341996#comment-15341996
 ] 

Matt Pouttu-Clarke commented on BEAM-91:


I've solved this problem successfully several times since around 2007.  It 
requires implementing data versioning and treating data much like you would 
treat code in github.  You could also call it streaming with parallel universe 
support, as some consumers may not want or need your redactions, while others 
may have critical need of them (much like in the source code world some users 
do not want immediate "upgrades").  Also, please note that it is just as 
important to support redacting structural changes as it is to support redacting 
data changes.  I have mature and battle tested designs in this area if there's 
interest.

> Retractions
> ---
>
> Key: BEAM-91
> URL: https://issues.apache.org/jira/browse/BEAM-91
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Tyler Akidau
>Assignee: Frances Perry
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> We still haven't added retractions to Beam, even though they're a core part 
> of the model. We should document all the necessary aspects (uncombine, 
> reverting DoFn output with DoOvers, sink integration, source-level 
> retractions, etc), and then implement them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-365) TextIO withoutSharding causes Flink to throw IllegalStateException

2016-06-21 Thread Pawel Szczur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Szczur updated BEAM-365:
--
Description: 
The exception: 
{code}java.lang.IllegalStateException: Shard name template '' only generated 1 
distinct file names for 3 files{code}

The initial discussion took place some time ago, the {{withoutSharding}} was 
then silently ignored by the runner.

Explanation from Aljoscha Krettek:
{quote}
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha
{quote}

Original discussion:
http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E

  was:
The exception: 
{code}java.lang.IllegalStateException: Shard name template '' only generated 1 
distinct file names for 3 files{code}

The initial discussion took place some time ago, the {{withoutSharding}} was 
then silently ignored.

Explanation from Aljoscha Krettek:
{quote}
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha
{quote}

Original discussion:
http://mail-archives.apache.org/mod_mbo

[jira] [Created] (BEAM-365) TextIO withoutSharding causes Flink to throw IllegalStateException

2016-06-21 Thread Pawel Szczur (JIRA)
Pawel Szczur created BEAM-365:
-

 Summary: TextIO withoutSharding causes Flink to throw 
IllegalStateException
 Key: BEAM-365
 URL: https://issues.apache.org/jira/browse/BEAM-365
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Affects Versions: 0.2.0-incubating
Reporter: Pawel Szczur


The exception: 
{code}java.lang.IllegalStateException: Shard name template '' only generated 1 
distinct file names for 3 files{code}

The initial discussion took place some time ago, the {{withoutSharding}} was 
then silently ignored.

Explanation from Aljoscha Krettek:
{quote}
Hi,
the issue is a bit more complicated and involves the Beam sink API and the
Flink runner.

I'll have to get a bit into how Beam sinks work. The base class for sinks
is Sink (TextIO.write gets translated to Write.to(new TextSink())).
Write.to normally gets translated to three ParDo operations that cooperate
to do the writing:

 - "Initialize": this does initial initialization of the Sink, this is run
only once, per sink, non-parallel.

 - "WriteBundles": this gets an initialized sink on a side-input and the
values to write on the main input. This runs in parallel, so for Flink, if
you set parallelism=6 you'll get 6 parallel instances of this operation at
runtime. This operation forwards information about where it writes to
downstream. This does not write to the final file location but an
intermediate staging location.

 - "Finalize": This gets the initialized sink on the main-input and and the
information about written files from "WriteBundles" as a side-input. This
also only runs once, non-parallel. Here we're writing the intermediate
files to a final location based on the sharding template.

The problem is that Write.to() and TextSink, as well as all other sinks,
are not aware of the number of shards. If you set "withoutSharding()" this
will set the shard template to "" (empty string) and the number of shards
to 1. "WriteBundles", however is not aware of this and will write 6
intermediate files if you set parallelism=6. In "Finalize" we will copy an
intermediate file to the same final location 6 times based on the sharding
template. The end result is that you only get one of the six result shards.

The reason why this does only occur in the Flink runner is that all other
runners have special overrides for TextIO.Write and AvroIO.Write that kick
in if sharding control is required. So, for the time being this is a Flink
runner bug and we might have to introduce special overrides as well until
this is solved in the general case.

Cheers,
Aljoscha
{quote}

Original discussion:
http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[incubator-beam] Git Push Summary

2016-06-21 Thread rmannibucau
Repository: incubator-beam
Updated Branches:
  refs/heads/BEAM-357_windows-build-fails [deleted] 460d21cb7


[1/2] incubator-beam git commit: fixing build on windows

2016-06-21 Thread rmannibucau
Repository: incubator-beam
Updated Branches:
  refs/heads/BEAM-357_windows-build-fails [created] 460d21cb7


fixing build on windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41883300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41883300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41883300

Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 418833001fe6dd581f42f7fcc3c35ef36f292007
Parents: 0e4d0a9
Author: Romain manni-Bucau 
Authored: Sun Jun 19 21:19:57 2016 +0200
Committer: Romain manni-Bucau 
Committed: Sun Jun 19 21:19:57 2016 +0200

--
 .../beam/runners/flink/WriteSinkITCase.java |  13 +
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +
 .../beam/runners/spark/io/AvroPipelineTest.java |   7 +
 .../beam/runners/spark/io/NumShardsTest.java|   7 +
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   7 +
 .../translation/TransformTranslatorTest.java|   7 +
 .../src/main/resources/beam/checkstyle.xml  |   4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |   7 +-
 .../beam/sdk/testing/HadoopWorkarounds.java | 129 +
 sdks/java/io/hdfs/pom.xml   |   9 +
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java| 264 ++-
 sdks/java/maven-archetypes/starter/pom.xml  |   3 +
 12 files changed, 334 insertions(+), 131 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 36d3aef..1a56350 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
 
@@ -75,6 +76,18 @@ public class WriteSinkITCase extends JavaProgramTestBase {
 p.run();
   }
 
+
+  @Override
+  public void stopCluster() throws Exception {
+try {
+  super.stopCluster();
+} catch (final IOException ioe) {
+  if (ioe.getMessage().startsWith("Unable to delete file")) {
+// that's ok for the test itself, just the OS playing with us on 
cleanup phase
+  }
+}
+  }
+
   /**
* Simple custom sink which writes to a file.
*/

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..4980995 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -40,11 +41,13 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -61,6 +64,11 @@ public class SimpleWordCountTest {
   private static final Set EXPECTED_COUNT_SET =
   ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+HadoopWorkarounds.winTests();
+  }
+
   @Test
   public void testInMem() throws Exception {
 SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
--
dif

[2/2] incubator-beam git commit: better comments for win workaround and basic sanity checks for winutils.exe

2016-06-21 Thread rmannibucau
better comments for win workaround and basic sanity checks for winutils.exe


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/460d21cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/460d21cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/460d21cb

Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 460d21cb7070603f789da9d13e12668194c91e9b
Parents: 4188330
Author: Romain manni-Bucau 
Authored: Tue Jun 21 10:37:05 2016 +0200
Committer: Romain manni-Bucau 
Committed: Tue Jun 21 10:37:05 2016 +0200

--
 .../beam/runners/flink/WriteSinkITCase.java |   2 +-
 .../beam/sdk/testing/HadoopWorkarounds.java | 109 +--
 sdks/java/io/hdfs/pom.xml   |   9 --
 sdks/java/maven-archetypes/starter/pom.xml  |   6 +-
 4 files changed, 104 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 1a56350..bb3778d 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -54,7 +54,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
 
   @Override
   protected void preSubmit() throws Exception {
-resultPath = getTempDirPath("result");
+resultPath = getTempDirPath("result-" + System.nanoTime());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
index ee2e135..1c2aa20 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static java.util.Arrays.asList;
+
 import org.apache.commons.compress.utils.IOUtils;
 
 import java.io.File;
@@ -26,15 +28,21 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.util.Arrays;
 
 /**
  * A simple class ensure winutils.exe can be found in the JVM.
+ * 
+ * See http://wiki.apache.org/hadoop/WindowsProblems for details.
+ * 
+ * Note: don't forget to add org.bouncycastle:bcpg-jdk16 dependency to use it.
  */
 public class HadoopWorkarounds {
 /**
  * In practise this method only needs to be called once by JVM
  * since hadoop uses static variables to store it.
- *
+ * 
  * Note: ensure invocation is done before hadoop reads it
  * and ensure this folder survives tests
  * (avoid temporary folder usage since tests can share it).
@@ -51,6 +59,8 @@ public class HadoopWorkarounds {
 // hadoop doesn't have winutils.exe :(: 
https://issues.apache.org/jira/browse/HADOOP-10051
 // so use this github repo temporarly then just use the main tar.gz
 /*
+// note this commented code requires commons-compress dependency (to 
add if we use that)
+
 String hadoopVersion = VersionInfo.getVersion();
 final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
   hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + 
".tar.gz");
@@ -97,19 +107,49 @@ public class HadoopWorkarounds {
 + "-Dhadoop.home.dir so we'll download winutils.exe");
 
 new File(hadoopHome, "bin").mkdirs();
-final URL url;
-try {
-url = new URL("https://github.com/steveloughran/winutils/";
-+ "raw/master/hadoop-2.7.1/bin/winutils.exe");
-} catch (final MalformedURLException e) { // unlikely
-throw new IllegalArgumentException(e);
+final File winutils = new File(hadoopHome, "bin/winutils.exe");
+
+for (final String suffix : asList("", ".asc")) {
+final URL url;
+try {
+// this code is not a random URL - read HADOOP-10051
+// it is provided and signed with an ASF gpg key.
+
+// note: 2.6.3 cause 2.6.4, 2.7.1 don't have .asc
+url = new UR