Jenkins build is still unstable: beam_PostCommit_RunnableOnService_SparkLocal #130

2016-11-11 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-477) Use HTTPS for maven repositories

2016-11-11 Thread Jagan Vujjini (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15659141#comment-15659141
 ] 

Jagan Vujjini commented on BEAM-477:


[~kenn] is the only change here to enforce maven to be [3.2.3,) or am I reading 
it wrong?

> Use HTTPS for maven repositories
> 
>
> Key: BEAM-477
> URL: https://issues.apache.org/jira/browse/BEAM-477
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>  Labels: easy, easyfix, newbie, starter
>
> This can be done either by requiring Maven >=3.2.3 or by following 
> instruction here: 
> http://central.sonatype.org/pages/consumers.html#apache-maven



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #822

2016-11-11 Thread Apache Jenkins Server
See 




[GitHub] incubator-beam pull request #1355: [BEAM-498] Treat ProcessContext and Conte...

2016-11-11 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/1355

[BEAM-498] Treat ProcessContext and Context like other DoFn parameters

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Before this change, `ProcessContext` and `Context` were special-cased, 
while other parameters were treated generically. After this change, all 
parameters receive the same dynamic treatment. Thus a user need not request 
these parameters, the runner can save the effort of constructing them, and we 
can migrate towards less "all-in-one" parameters.

I changed only the analysis and code generation a bit to remove special 
casing, but did not write new delegations that would actually add dynamic 
parameter generation to methods that do not already have them, so only 
`@ProcessElement` and `@OnTimer` are affected.

Posted for self-review & signposting.

CC: @tgroh who has discussed this and @jkff who may have interest.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam ProcessContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1355


commit 959e906a7591eaa40c3c2451f0c3dcfdffc50c32
Author: Kenneth Knowles 
Date:   2016-11-11T22:55:16Z

Treat ProcessContext and Context like other DoFn parameters

Before this change, ProcessContext and Context were special-cased, while
other parameters were treated generically. After this change, all parameters
receive the same dynamic treatment. Thus a user need not request these
parameters, the runner can save the effort of constructing them, and we
can migrate towards less "all-in-one" parameters.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-498) Make DoFnWithContext the new DoFn

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15659025#comment-15659025
 ] 

ASF GitHub Bot commented on BEAM-498:
-

GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/1355

[BEAM-498] Treat ProcessContext and Context like other DoFn parameters

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Before this change, `ProcessContext` and `Context` were special-cased, 
while other parameters were treated generically. After this change, all 
parameters receive the same dynamic treatment. Thus a user need not request 
these parameters, the runner can save the effort of constructing them, and we 
can migrate towards less "all-in-one" parameters.

I changed only the analysis and code generation a bit to remove special 
casing, but did not write new delegations that would actually add dynamic 
parameter generation to methods that do not already have them, so only 
`@ProcessElement` and `@OnTimer` are affected.

Posted for self-review & signposting.

CC: @tgroh who has discussed this and @jkff who may have interest.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam ProcessContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1355


commit 959e906a7591eaa40c3c2451f0c3dcfdffc50c32
Author: Kenneth Knowles 
Date:   2016-11-11T22:55:16Z

Treat ProcessContext and Context like other DoFn parameters

Before this change, ProcessContext and Context were special-cased, while
other parameters were treated generically. After this change, all parameters
receive the same dynamic treatment. Thus a user need not request these
parameters, the runner can save the effort of constructing them, and we
can migrate towards less "all-in-one" parameters.




> Make DoFnWithContext the new DoFn
> -
>
> Key: BEAM-498
> URL: https://issues.apache.org/jira/browse/BEAM-498
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>  Labels: backward-incompatible
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1354: Add IP configuration to Python SDK

2016-11-11 Thread sammcveety
GitHub user sammcveety opened a pull request:

https://github.com/apache/incubator-beam/pull/1354

Add IP configuration to Python SDK

R: @chamikaramj 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sammcveety/incubator-beam sgmc/private_ip

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1354.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1354


commit e8a188ef37bb837e3c9b4253464f5d5375442ce6
Author: Sam McVeety 
Date:   2016-11-12T03:56:34Z

Add IP configuration to Python SDK




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_SparkLocal #129

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_RunnableOnService_SparkLocal #128

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build is back to stable : beam_PostCommit_MavenVerify #1802

2016-11-11 Thread Apache Jenkins Server
See 



Jenkins build became unstable: beam_PostCommit_MavenVerify #1801

2016-11-11 Thread Apache Jenkins Server
See 



Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #821

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_RunnableOnService_SparkLocal #127

2016-11-11 Thread Apache Jenkins Server
See 




[jira] [Created] (BEAM-967) Add Jenkins postcommit for Apex runner RunnableOnService tests

2016-11-11 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-967:


 Summary: Add Jenkins postcommit for Apex runner RunnableOnService 
tests
 Key: BEAM-967
 URL: https://issues.apache.org/jira/browse/BEAM-967
 Project: Beam
  Issue Type: Bug
  Components: testing
Reporter: Kenneth Knowles
Assignee: Davor Bonaci


During development of the Apex runner, all RunnableOnService tests were run in 
precommit. Now that it has been merged, they are not. It needs the usual 
treatment that we give runners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #820

2016-11-11 Thread Apache Jenkins Server
See 




[01/39] incubator-beam git commit: BEAM-261 Apex runner PoC

2016-11-11 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master e2c21599d -> 7d069a65b


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
new file mode 100644
index 000..efb69ee
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+
+public interface ApexStreamTuple
+{
+  /**
+   * Gets the value of the tuple
+   *
+   * @return
+   */
+  T getValue();
+
+  /**
+   * Plain tuple class
+   *
+   * @param 
+   */
+  class DataTuple implements ApexStreamTuple
+  {
+private T value;
+
+public static  DataTuple of(T value) {
+  return new DataTuple<>(value);
+}
+
+private DataTuple(T value)
+{
+  this.value = value;
+}
+
+@Override
+public T getValue()
+{
+  return value;
+}
+
+public void setValue(T value)
+{
+  this.value = value;
+}
+
+@Override
+public String toString()
+{
+  return value.toString();
+}
+
+  }
+
+  /**
+   * Tuple that includes a timestamp
+   *
+   * @param 
+   */
+  class TimestampedTuple extends DataTuple
+  {
+private long timestamp;
+
+public TimestampedTuple(long timestamp, T value)
+{
+  super(value);
+  this.timestamp = timestamp;
+}
+
+public long getTimestamp()
+{
+  return timestamp;
+}
+
+public void setTimestamp(long timestamp)
+{
+  this.timestamp = timestamp;
+}
+  }
+
+  /**
+   * Tuple that represents a watermark
+   *
+   * @param 
+   */
+  class WatermarkTuple extends TimestampedTuple
+  {
+public static  WatermarkTuple of(long timestamp) {
+  return new WatermarkTuple<>(timestamp);
+}
+
+protected WatermarkTuple(long timestamp)
+{
+  super(timestamp, null);
+}
+
+@Override
+public String toString()
+{
+  return "[Watermark " + getTimestamp() + "]";
+}
+  }
+
+  /**
+   * Coder for {@link ApexStreamTuple}.
+   */
+  public static class ApexStreamTupleCoder extends 
StandardCoder> {
+private static final long serialVersionUID = 1L;
+final Coder valueCoder;
+
+public static  ApexStreamTupleCoder of(Coder valueCoder) {
+  return new ApexStreamTupleCoder<>(valueCoder);
+}
+
+protected ApexStreamTupleCoder(Coder valueCoder) {
+  this.valueCoder = checkNotNull(valueCoder);
+}
+
+@Override
+public void encode(ApexStreamTuple value, OutputStream outStream, 
Context context)
+throws CoderException, IOException {
+  if (value instanceof WatermarkTuple) {
+outStream.write(1);
+new 
DataOutputStream(outStream).writeLong(((WatermarkTuple)value).getTimestamp());
+  } else {
+outStream.write(0);
+valueCoder.encode(value.getValue(), outStream, context);
+  }
+}
+
+@Override
+public ApexStreamTuple decode(InputStream inStream, Context context)
+throws CoderException, IOException
+{
+  int b = inStream.read();
+  if (b == 1) {
+return new WatermarkTuple(new DataInputStream(inStream).readLong());
+  } else {
+return new DataTuple(valueCoder.decode(inStream, context));
+  }
+}
+
+@Override
+public List> getCoderArguments()
+{
+  return Arrays.>asList(valueCoder);
+}
+
+@Override
+

[14/39] incubator-beam git commit: Adjust for merge from master.

2016-11-11 Thread kenn
Adjust for merge from master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a1b2789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a1b2789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a1b2789

Branch: refs/heads/master
Commit: 0a1b2789589ad834736d2998d00e43f554a56432
Parents: 6a971d6
Author: Thomas Weise 
Authored: Tue Oct 25 09:16:52 2016 -0700
Committer: Thomas Weise 
Committed: Tue Oct 25 09:16:52 2016 -0700

--
 runners/apex/pom.xml   | 2 +-
 .../beam/runners/apex/translators/utils/ApexStreamTuple.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a1b2789/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8b62410..2239d03 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -24,7 +24,7 @@
   
 org.apache.beam
 beam-runners-parent
-0.3.0-incubating-SNAPSHOT
+0.4.0-incubating-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a1b2789/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index a260a66..7f8b0fa 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -134,7 +134,7 @@ public interface ApexStreamTuple {
   /**
* Coder for {@link ApexStreamTuple}.
*/
-  public static class ApexStreamTupleCoder extends 
StandardCoder> {
+  class ApexStreamTupleCoder extends StandardCoder> {
 private static final long serialVersionUID = 1L;
 final Coder valueCoder;
 



[11/39] incubator-beam git commit: This closes #540

2016-11-11 Thread kenn
This closes #540


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9f14066
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9f14066
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9f14066

Branch: refs/heads/master
Commit: c9f140f2c86128727534f42b10fb8f93981d
Parents: b8e6eea 9454b3b
Author: Kenneth Knowles 
Authored: Mon Oct 17 09:47:49 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Oct 17 09:47:49 2016 -0700

--
 runners/apex/pom.xml| 223 +
 .../beam/runners/apex/ApexPipelineOptions.java  |  66 +++
 .../runners/apex/ApexPipelineTranslator.java| 185 
 .../apache/beam/runners/apex/ApexRunner.java| 329 +
 .../beam/runners/apex/ApexRunnerResult.java |  87 
 .../beam/runners/apex/TestApexRunner.java   |  59 +++
 .../apache/beam/runners/apex/package-info.java  |  22 +
 .../translators/CreateValuesTranslator.java |  48 ++
 .../FlattenPCollectionTranslator.java   | 129 ++
 .../apex/translators/GroupByKeyTranslator.java  |  41 ++
 .../translators/ParDoBoundMultiTranslator.java  | 130 ++
 .../apex/translators/ParDoBoundTranslator.java  |  62 +++
 .../translators/ReadUnboundedTranslator.java|  42 ++
 .../apex/translators/TransformTranslator.java   |  31 ++
 .../apex/translators/TranslationContext.java| 168 +++
 .../functions/ApexFlattenOperator.java  | 124 +
 .../functions/ApexGroupByKeyOperator.java   | 462 +++
 .../functions/ApexParDoOperator.java| 374 +++
 .../translators/functions/package-info.java |  22 +
 .../io/ApexReadUnboundedInputOperator.java  | 145 ++
 .../apex/translators/io/ValuesSource.java   | 149 ++
 .../apex/translators/io/package-info.java   |  22 +
 .../runners/apex/translators/package-info.java  |  22 +
 .../apex/translators/utils/ApexStreamTuple.java | 205 
 .../utils/CoderAdapterStreamCodec.java  |  69 +++
 .../apex/translators/utils/NoOpStepContext.java |  72 +++
 .../utils/SerializablePipelineOptions.java  |  60 +++
 .../utils/ValueAndCoderKryoSerializable.java|  77 
 .../apex/translators/utils/package-info.java|  22 +
 .../apex/examples/StreamingWordCountTest.java   | 121 +
 .../apex/examples/UnboundedTextSource.java  | 142 ++
 .../runners/apex/examples/package-info.java |  22 +
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translators/GroupByKeyTranslatorTest.java   | 245 ++
 .../translators/ParDoBoundTranslatorTest.java   | 206 +
 .../translators/ReadUnboundTranslatorTest.java  | 129 ++
 .../translators/utils/CollectionSource.java | 136 ++
 .../translators/utils/PipelineOptionsTest.java  |  84 
 .../apex/src/test/resources/log4j.properties|  35 ++
 runners/pom.xml |   1 +
 40 files changed, 4667 insertions(+)
--




[35/39] incubator-beam git commit: Closes #1213

2016-11-11 Thread kenn
Closes #1213


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c08ebbe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c08ebbe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c08ebbe7

Branch: refs/heads/master
Commit: c08ebbe79dc86230f6a3f0b983f861ebe039917c
Parents: 9197d1e 5553c60
Author: Thomas Weise 
Authored: Mon Nov 7 19:00:31 2016 -0800
Committer: Thomas Weise 
Committed: Mon Nov 7 19:00:31 2016 -0800

--
 .../runners/apex/ApexPipelineTranslator.java| 185 ---
 .../apache/beam/runners/apex/ApexRunner.java|  13 +-
 .../beam/runners/apex/ApexRunnerResult.java |   2 +-
 .../translation/ApexPipelineTranslator.java | 179 +++
 .../translation/CreateValuesTranslator.java |  48 ++
 .../FlattenPCollectionTranslator.java   | 129 +
 .../apex/translation/GroupByKeyTranslator.java  |  42 ++
 .../translation/ParDoBoundMultiTranslator.java  | 142 ++
 .../apex/translation/ParDoBoundTranslator.java  |  64 +++
 .../translation/ReadUnboundedTranslator.java|  42 ++
 .../apex/translation/TransformTranslator.java   |  31 ++
 .../apex/translation/TranslationContext.java| 178 +++
 .../operators/ApexFlattenOperator.java  | 125 +
 .../operators/ApexGroupByKeyOperator.java   | 478 +++
 .../operators/ApexParDoOperator.java| 375 +++
 .../ApexReadUnboundedInputOperator.java | 155 ++
 .../translation/operators/package-info.java |  22 +
 .../runners/apex/translation/package-info.java  |  22 +
 .../translation/utils/ApexStateInternals.java   | 438 +
 .../apex/translation/utils/ApexStreamTuple.java | 222 +
 .../utils/CoderAdapterStreamCodec.java  |  69 +++
 .../apex/translation/utils/NoOpStepContext.java |  72 +++
 .../utils/SerializablePipelineOptions.java  |  60 +++
 .../utils/ValueAndCoderKryoSerializable.java|  77 +++
 .../apex/translation/utils/ValuesSource.java| 149 ++
 .../apex/translation/utils/package-info.java|  22 +
 .../translators/CreateValuesTranslator.java |  48 --
 .../FlattenPCollectionTranslator.java   | 129 -
 .../apex/translators/GroupByKeyTranslator.java  |  42 --
 .../translators/ParDoBoundMultiTranslator.java  | 142 --
 .../apex/translators/ParDoBoundTranslator.java  |  64 ---
 .../translators/ReadUnboundedTranslator.java|  42 --
 .../apex/translators/TransformTranslator.java   |  31 --
 .../apex/translators/TranslationContext.java| 178 ---
 .../functions/ApexFlattenOperator.java  | 125 -
 .../functions/ApexGroupByKeyOperator.java   | 478 ---
 .../functions/ApexParDoOperator.java| 375 ---
 .../translators/functions/package-info.java |  22 -
 .../io/ApexReadUnboundedInputOperator.java  | 154 --
 .../apex/translators/io/ValuesSource.java   | 149 --
 .../apex/translators/io/package-info.java   |  22 -
 .../runners/apex/translators/package-info.java  |  22 -
 .../translators/utils/ApexStateInternals.java   | 438 -
 .../apex/translators/utils/ApexStreamTuple.java | 222 -
 .../utils/CoderAdapterStreamCodec.java  |  69 ---
 .../apex/translators/utils/NoOpStepContext.java |  72 ---
 .../utils/SerializablePipelineOptions.java  |  60 ---
 .../utils/ValueAndCoderKryoSerializable.java|  77 ---
 .../apex/translators/utils/package-info.java|  22 -
 .../translation/ApexGroupByKeyOperatorTest.java | 112 +
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translation/GroupByKeyTranslatorTest.java   | 246 ++
 .../translation/ParDoBoundTranslatorTest.java   | 340 +
 .../translation/ReadUnboundTranslatorTest.java  | 129 +
 .../utils/ApexStateInternalsTest.java   | 361 ++
 .../translation/utils/CollectionSource.java | 136 ++
 .../translation/utils/PipelineOptionsTest.java  |  84 
 .../translators/ApexGroupByKeyOperatorTest.java | 112 -
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translators/GroupByKeyTranslatorTest.java   | 246 --
 .../translators/ParDoBoundTranslatorTest.java   | 340 -
 .../translators/ReadUnboundTranslatorTest.java  | 129 -
 .../utils/ApexStateInternalsTest.java   | 361 --
 .../translators/utils/CollectionSource.java | 136 --
 .../translators/utils/PipelineOptionsTest.java  |  84 
 65 files changed, 4653 insertions(+), 4685 deletions(-)
--




[26/39] incubator-beam git commit: BEAM-844 Add README, wordcount test, fix View.AsIterable, initial GBK watermark.

2016-11-11 Thread kenn
BEAM-844 Add README, wordcount test, fix View.AsIterable, initial GBK watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a21550f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a21550f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a21550f7

Branch: refs/heads/master
Commit: a21550f7ae3004e460ca6dfb33102a9fb191356c
Parents: 51af7e5
Author: Thomas Weise 
Authored: Thu Nov 3 05:40:46 2016 +0100
Committer: Thomas Weise 
Committed: Fri Nov 4 23:01:17 2016 +0100

--
 runners/apex/README.md  |  76 
 runners/apex/pom.xml|   1 +
 .../apache/beam/runners/apex/ApexRunner.java|  82 
 .../functions/ApexGroupByKeyOperator.java   |   2 +-
 .../io/ApexReadUnboundedInputOperator.java  |   2 +-
 .../apex/translators/utils/ApexStreamTuple.java |  21 ++-
 .../apex/examples/StreamingWordCountTest.java   | 121 
 .../runners/apex/examples/WordCountTest.java| 188 +++
 .../translators/ApexGroupByKeyOperatorTest.java | 112 +++
 .../translators/GroupByKeyTranslatorTest.java   |   1 +
 runners/apex/src/test/resources/words.txt   |   3 +
 11 files changed, 484 insertions(+), 125 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/README.md
--
diff --git a/runners/apex/README.md b/runners/apex/README.md
new file mode 100644
index 000..c9e47a1
--- /dev/null
+++ b/runners/apex/README.md
@@ -0,0 +1,76 @@
+
+
+Apex Beam Runner ﴾Apex‐Runner﴿
+=
+
+Apex‐Runner is a Runner for Apache Beam which executes Beam pipelines with 
Apache Apex as underlying engine. The runner has broad support for the Beam 
model and supports streaming and batch pipelines. 
+
+[Apache Apex](http://apex.apache.org/) is a stream processing platform and 
framework for low-latency, high-throughput and fault-tolerant analytics 
applications on Apache Hadoop. Apex is Java based and also provides its own API 
for application development (native compositional and declarative Java API, 
SQL) with a comprehensive [operator 
library](https://github.com/apache/apex-malhar). Apex has a unified streaming 
architecture and can be used for real-time and batch processing. With its 
stateful stream processing architecture Apex can support all of the concepts in 
the Beam model (event time, triggers, watermarks etc.).
+
+##Status
+
+Apex-Runner is relatively new. It is fully functional and can currently be 
used to run pipelines in embedded mode. It does not take advantage of all the 
performance and scalability that Apex can deliver. This is expected to be 
addressed with upcoming work, leveraging features like incremental 
checkpointing, partitioning and operator affinity from Apex. Please see 
[JIRA](https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20runner-apex%20AND%20resolution%20%3D%20Unresolved)
 and we welcome contributions!
+
+##Getting Started
+
+The following shows how to run the WordCount example that is provided with the 
source code on Apex (the example is identical with the one provided as part of 
the Beam examples). 
+
+###Installing Beam
+
+To get the latest version of Beam with Apex-Runner, first clone the Beam 
repository:
+
+```
+git clone https://github.com/apache/incubator‐beam
+```
+
+Then switch to the newly created directory and run Maven to build the Apache 
Beam:
+
+```
+cd incubator‐beam
+mvn clean install ‐DskipTests
+```
+
+Now Apache Beam and the Apex Runner are installed in your local Maven 
repository.
+
+###Running an Example
+
+Download something to count:
+
+```
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+```
+
+Run the pipeline, using the Apex runner:
+
+```
+cd examples/java
+mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt 
--runner=ApexRunner" -Pinclude-runners
+```
+
+Once completed, there will be multiple output files with the base name given 
above:
+
+```
+$ ls /tmp/out-*
+/tmp/out-0-of-3  /tmp/out-1-of-3  /tmp/out-2-of-3
+```
+
+##Running pipelines on an Apex YARN cluster
+
+Coming soon.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index d4bcc3d..1ca61b9 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -215,6 +215,7 @@
   
 
org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT
 
org.apache

[38/39] incubator-beam git commit: Add beam-runners-apex dependency.

2016-11-11 Thread kenn
Add beam-runners-apex dependency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41394c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41394c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41394c21

Branch: refs/heads/master
Commit: 41394c2109fa268a1637d4bb641827ef246c3e23
Parents: 320d619
Author: Thomas Weise 
Authored: Fri Nov 11 13:13:33 2016 -0800
Committer: Thomas Weise 
Committed: Fri Nov 11 13:13:33 2016 -0800

--
 pom.xml | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41394c21/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 22897e3..bd9448a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -364,6 +364,12 @@
 
   
 org.apache.beam
+beam-runners-apex
+${project.version}
+  
+
+  
+org.apache.beam
 beam-examples-java
 ${project.version}
   



[29/39] incubator-beam git commit: Adjust for merge from master.

2016-11-11 Thread kenn
Adjust for merge from master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9197d1e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9197d1e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9197d1e0

Branch: refs/heads/master
Commit: 9197d1e05e034f8dea0b413c09e7428e90294509
Parents: 5ccd5fb
Author: Thomas Weise 
Authored: Mon Nov 7 20:29:08 2016 +0100
Committer: Thomas Weise 
Committed: Mon Nov 7 20:29:08 2016 +0100

--
 .../translators/functions/ApexGroupByKeyOperator.java | 14 ++
 1 file changed, 14 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9197d1e0/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 98f3eca..4c28c85 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -60,12 +60,14 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -451,6 +453,18 @@ public class ApexGroupByKeyOperator implements 
Operator {
   // TODO Auto-generated method stub
   return null;
 }
+
+@Override
+public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
+TimeDomain timeDomain) {
+  throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
+}
+
+@Override
+public void deleteTimer(StateNamespace namespace, String timerId) {
+  throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
+}
+
   }
 
   private class GroupByKeyStateInternalsFactory implements 
StateInternalsFactory, Serializable {



[06/39] incubator-beam git commit: BEAM-261 Add support for ParDo.BoundMulti

2016-11-11 Thread kenn
BEAM-261 Add support for ParDo.BoundMulti


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/047cff49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/047cff49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/047cff49

Branch: refs/heads/master
Commit: 047cff492f1f804785dee73b4768293d3569e8de
Parents: 0975494
Author: Thomas Weise 
Authored: Thu Oct 6 22:36:01 2016 -0700
Committer: Thomas Weise 
Committed: Sun Oct 16 23:27:15 2016 -0700

--
 runners/apex/pom.xml|  3 +-
 .../runners/apex/ApexPipelineTranslator.java|  2 +
 .../apache/beam/runners/apex/ApexRunner.java|  3 +-
 .../FlattenPCollectionTranslator.java   |  1 +
 .../translators/ParDoBoundMultiTranslator.java  | 74 
 .../apex/translators/ParDoBoundTranslator.java  |  5 +-
 .../apex/translators/TranslationContext.java| 17 +
 .../functions/ApexFlattenOperator.java  |  2 +
 .../functions/ApexParDoOperator.java| 68 ++
 .../FlattenPCollectionTranslatorTest.java   | 42 +--
 .../translators/ParDoBoundTranslatorTest.java   | 29 
 11 files changed, 194 insertions(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index e9377b4..929feb4 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,8 +185,7 @@
   
 
   [
-"--runner=org.apache.beam.runners.apex.TestApexRunner",
-"--streaming=true"
+"--runner=org.apache.beam.runners.apex.TestApexRunner"
   ]
 
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index ad8c283..40edfb1 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -22,6 +22,7 @@ import 
org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.apex.translators.ParDoBoundMultiTranslator;
 import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
 import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.apex.translators.TransformTranslator;
@@ -66,6 +67,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   static {
 // register TransformTranslators
 registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+registerTransformTranslator(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslator<>());
 registerTransformTranslator(Read.Unbounded.class, new 
ReadUnboundedTranslator());
 registerTransformTranslator(Read.Bounded.class, new 
ReadBoundedTranslator());
 registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index ae79a20..e2ebc29 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -230,7 +230,7 @@ public class ApexRunner extends 
PipelineRunner {
* Records that the {@link PTransform} requires a deterministic key coder.
*/
   private void recordViewUsesNonDeterministicKeyCoder(PTransform 
ptransform) {
-throw new UnsupportedOperationException();
+//throw new UnsupportedOperationException();
   }
 
   /**
@@ -369,7 +369,6 @@ public class ApexRunner extends 
PipelineRunner {
 
 private final ApexRunner runner;
 
-@SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
 public StreamingViewAsMap(ApexRunner r

[09/39] incubator-beam git commit: BEAM-261 Enable checkstyle and cleanup.

2016-11-11 Thread kenn
BEAM-261 Enable checkstyle and cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9454b3bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9454b3bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9454b3bd

Branch: refs/heads/master
Commit: 9454b3bdc6f6ff69363dcd339cfb069c2c2f8cc9
Parents: 1ec7cd9
Author: Thomas Weise 
Authored: Sun Oct 16 17:36:01 2016 -0700
Committer: Thomas Weise 
Committed: Mon Oct 17 09:22:49 2016 -0700

--
 runners/apex/pom.xml|   2 -
 .../runners/apex/ApexPipelineTranslator.java|  39 +--
 .../apache/beam/runners/apex/ApexRunner.java| 314 +++
 .../beam/runners/apex/ApexRunnerResult.java |  23 +-
 .../beam/runners/apex/TestApexRunner.java   |   9 +-
 .../apache/beam/runners/apex/package-info.java  |  22 ++
 .../translators/CreateValuesTranslator.java |  12 +-
 .../FlattenPCollectionTranslator.java   |  13 +-
 .../apex/translators/GroupByKeyTranslator.java  |   4 +-
 .../translators/ParDoBoundMultiTranslator.java  |  47 +--
 .../apex/translators/ParDoBoundTranslator.java  |   5 +-
 .../translators/ReadUnboundedTranslator.java|   4 +-
 .../apex/translators/TransformTranslator.java   |   8 +-
 .../apex/translators/TranslationContext.java|  40 +--
 .../functions/ApexFlattenOperator.java  |  42 ++-
 .../functions/ApexGroupByKeyOperator.java   | 155 +
 .../functions/ApexParDoOperator.java| 140 -
 .../translators/functions/package-info.java |  22 ++
 .../io/ApexReadUnboundedInputOperator.java  |  57 ++--
 .../apex/translators/io/ValuesSource.java   |  23 +-
 .../apex/translators/io/package-info.java   |  22 ++
 .../runners/apex/translators/package-info.java  |  22 ++
 .../apex/translators/utils/ApexStreamTuple.java |  85 +++--
 .../utils/CoderAdapterStreamCodec.java  |  24 +-
 .../apex/translators/utils/NoOpStepContext.java |   7 +-
 .../utils/SerializablePipelineOptions.java  |  21 +-
 .../utils/ValueAndCoderKryoSerializable.java|  26 +-
 .../apex/translators/utils/package-info.java|  22 ++
 .../beam/runners/apex/examples/IntTest.java | 133 
 .../apex/examples/StreamingWordCountTest.java   |  15 +-
 .../apex/examples/UnboundedTextSource.java  |  16 +-
 .../runners/apex/examples/package-info.java |  22 ++
 .../FlattenPCollectionTranslatorTest.java   |  32 +-
 .../translators/GroupByKeyTranslatorTest.java   |  45 ++-
 .../translators/ParDoBoundTranslatorTest.java   |  20 +-
 .../translators/ReadUnboundTranslatorTest.java  |  45 ++-
 .../translators/utils/CollectionSource.java |  13 +-
 .../translators/utils/PipelineOptionsTest.java  |  28 +-
 .../apex/src/test/resources/log4j.properties|   8 +-
 39 files changed, 662 insertions(+), 925 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 929feb4..8b62410 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -148,12 +148,10 @@
   
 
 
-  
 
   
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a16f551..a6857ee 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,11 @@
 
 package org.apache.beam.runners.apex;
 
+import com.datatorrent.api.DAG;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
@@ -43,18 +48,13 @@ import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
  * into Apex logical plan {@link DAG}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-  ApexPipelineTranslator.class);
+  private static final Logger LOG = 
LoggerFactory.g

[34/39] incubator-beam git commit: BEAM-261 Make translators package private.

2016-11-11 Thread kenn
BEAM-261 Make translators package private.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5553c603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5553c603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5553c603

Branch: refs/heads/master
Commit: 5553c603a0c48855d38d4702f19e905eac2034f2
Parents: 9197d1e
Author: Thomas Weise 
Authored: Thu Oct 27 16:19:15 2016 -0700
Committer: Thomas Weise 
Committed: Mon Nov 7 22:33:46 2016 +0100

--
 .../runners/apex/ApexPipelineTranslator.java| 185 ---
 .../apache/beam/runners/apex/ApexRunner.java|  13 +-
 .../beam/runners/apex/ApexRunnerResult.java |   2 +-
 .../translation/ApexPipelineTranslator.java | 179 +++
 .../translation/CreateValuesTranslator.java |  48 ++
 .../FlattenPCollectionTranslator.java   | 129 +
 .../apex/translation/GroupByKeyTranslator.java  |  42 ++
 .../translation/ParDoBoundMultiTranslator.java  | 142 ++
 .../apex/translation/ParDoBoundTranslator.java  |  64 +++
 .../translation/ReadUnboundedTranslator.java|  42 ++
 .../apex/translation/TransformTranslator.java   |  31 ++
 .../apex/translation/TranslationContext.java| 178 +++
 .../operators/ApexFlattenOperator.java  | 125 +
 .../operators/ApexGroupByKeyOperator.java   | 478 +++
 .../operators/ApexParDoOperator.java| 375 +++
 .../ApexReadUnboundedInputOperator.java | 155 ++
 .../translation/operators/package-info.java |  22 +
 .../runners/apex/translation/package-info.java  |  22 +
 .../translation/utils/ApexStateInternals.java   | 438 +
 .../apex/translation/utils/ApexStreamTuple.java | 222 +
 .../utils/CoderAdapterStreamCodec.java  |  69 +++
 .../apex/translation/utils/NoOpStepContext.java |  72 +++
 .../utils/SerializablePipelineOptions.java  |  60 +++
 .../utils/ValueAndCoderKryoSerializable.java|  77 +++
 .../apex/translation/utils/ValuesSource.java| 149 ++
 .../apex/translation/utils/package-info.java|  22 +
 .../translators/CreateValuesTranslator.java |  48 --
 .../FlattenPCollectionTranslator.java   | 129 -
 .../apex/translators/GroupByKeyTranslator.java  |  42 --
 .../translators/ParDoBoundMultiTranslator.java  | 142 --
 .../apex/translators/ParDoBoundTranslator.java  |  64 ---
 .../translators/ReadUnboundedTranslator.java|  42 --
 .../apex/translators/TransformTranslator.java   |  31 --
 .../apex/translators/TranslationContext.java| 178 ---
 .../functions/ApexFlattenOperator.java  | 125 -
 .../functions/ApexGroupByKeyOperator.java   | 478 ---
 .../functions/ApexParDoOperator.java| 375 ---
 .../translators/functions/package-info.java |  22 -
 .../io/ApexReadUnboundedInputOperator.java  | 154 --
 .../apex/translators/io/ValuesSource.java   | 149 --
 .../apex/translators/io/package-info.java   |  22 -
 .../runners/apex/translators/package-info.java  |  22 -
 .../translators/utils/ApexStateInternals.java   | 438 -
 .../apex/translators/utils/ApexStreamTuple.java | 222 -
 .../utils/CoderAdapterStreamCodec.java  |  69 ---
 .../apex/translators/utils/NoOpStepContext.java |  72 ---
 .../utils/SerializablePipelineOptions.java  |  60 ---
 .../utils/ValueAndCoderKryoSerializable.java|  77 ---
 .../apex/translators/utils/package-info.java|  22 -
 .../translation/ApexGroupByKeyOperatorTest.java | 112 +
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translation/GroupByKeyTranslatorTest.java   | 246 ++
 .../translation/ParDoBoundTranslatorTest.java   | 340 +
 .../translation/ReadUnboundTranslatorTest.java  | 129 +
 .../utils/ApexStateInternalsTest.java   | 361 ++
 .../translation/utils/CollectionSource.java | 136 ++
 .../translation/utils/PipelineOptionsTest.java  |  84 
 .../translators/ApexGroupByKeyOperatorTest.java | 112 -
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translators/GroupByKeyTranslatorTest.java   | 246 --
 .../translators/ParDoBoundTranslatorTest.java   | 340 -
 .../translators/ReadUnboundTranslatorTest.java  | 129 -
 .../utils/ApexStateInternalsTest.java   | 361 --
 .../translators/utils/CollectionSource.java | 136 --
 .../translators/utils/PipelineOptionsTest.java  |  84 
 65 files changed, 4653 insertions(+), 4685 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
-

[32/39] incubator-beam git commit: BEAM-261 Make translators package private.

2016-11-11 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
deleted file mode 100644
index 07c6494..000
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * Maintains context data for {@link TransformTranslator}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class TranslationContext {
-
-  private final ApexPipelineOptions pipelineOptions;
-  private AppliedPTransform currentTransform;
-  private final Map, List>>> 
streams = new HashMap<>();
-  private final Map operators = new HashMap<>();
-  private final Map, PInput> viewInputs = new HashMap<>();
-
-  public void addView(PCollectionView view) {
-this.viewInputs.put(view, this.getInput());
-  }
-
-  public  InputT getViewInput(PCollectionView view) {
-PInput input = this.viewInputs.get(view);
-checkArgument(input != null, "unknown view " + view.getName());
-return (InputT) input;
-  }
-
-  public TranslationContext(ApexPipelineOptions pipelineOptions) {
-this.pipelineOptions = pipelineOptions;
-  }
-
-  public void setCurrentTransform(TransformTreeNode treeNode) {
-this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
-treeNode.getInput(), treeNode.getOutput(), (PTransform) 
treeNode.getTransform());
-  }
-
-  public ApexPipelineOptions getPipelineOptions() {
-return pipelineOptions;
-  }
-
-  public  InputT getInput() {
-return (InputT) getCurrentTransform().getInput();
-  }
-
-  public  OutputT getOutput() {
-return (OutputT) getCurrentTransform().getOutput();
-  }
-
-  private AppliedPTransform getCurrentTransform() {
-checkArgument(currentTransform != null, "current transform not set");
-return currentTransform;
-  }
-
-  public void addOperator(Operator operator, OutputPort port) {
-addOperator(operator, port, this.>getOutput());
-  }
-
-  /**
-   * Register operator and output ports for the given collections.
-   * @param operator
-   * @param ports
-   */
-  public void addOperator(Operator operator, Map, 
OutputPort> ports) {
-boolean first = true;
-for (Map.Entry, OutputPort> portEntry : 
ports.entrySet()) {
-  if (first) {
-addOperator(operator, portEntry.getValue(), portEntry.getKey());
-first = false;
-  } else {
-this.streams.put(portEntry.getKey(), (Pair) new 
ImmutablePair<>(portEntry.getValue(),
-new ArrayList

[33/39] incubator-beam git commit: BEAM-261 Make translators package private.

2016-11-11 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
new file mode 100644
index 000..44e7b11
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
+import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import 
org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link DoFn}.
+ */
+public class ApexParDoOperator extends BaseOperator 
implements OutputManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApexParDoOperator.class);
+  private boolean traceTuples = true;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions pipelineOptions;
+  @Bind(JavaSerializer.class)
+  private final OldDoFn doFn;
+  @Bind(JavaSerializer.class)
+  private final TupleTag mainOutputTag;
+  @Bind(JavaSerializer.class)
+  private final List> sideOutputTags;
+  @Bind(JavaSerializer.class)
+  private final WindowingStrategy windowingStrategy;
+  @Bind(JavaSerializer.class)
+  private final List> sideInputs;
+
+  private final StateInternals sideInputStateInternals;
+  private final ValueAndCoderKryoSerializable>> 
pushedBack;
+  private LongMin pushedBackWatermark = new LongMin();
+  private long currentInputWatermark = Long.MIN_VALUE;
+  private long currentOutputWatermark = currentInputWatermark;
+
+  private transient PushbackSideInputDoFnRunner 
pushbackDoFnRunner;
+  private transient SideInputHandler sideInputHandler;
+  private transient M

[13/39] incubator-beam git commit: Merge branch 'master' into apex-runner

2016-11-11 Thread kenn
Merge branch 'master' into apex-runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a971d6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a971d6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a971d6b

Branch: refs/heads/master
Commit: 6a971d6b13a035f74ce1b2d2efc6d37f84a2373c
Parents: c9f1406 7de8d57
Author: Thomas Weise 
Authored: Tue Oct 25 09:07:50 2016 -0700
Committer: Thomas Weise 
Committed: Tue Oct 25 09:07:50 2016 -0700

--
 .travis.yml |  10 +-
 examples/java/pom.xml   |   2 +-
 .../beam/examples/DebuggingWordCount.java   |   2 +-
 .../apache/beam/examples/WindowedWordCount.java |   2 +-
 .../org/apache/beam/examples/WordCount.java |   4 +-
 .../common/ExampleBigQueryTableOptions.java |   2 +-
 ...xamplePubsubTopicAndSubscriptionOptions.java |   2 +-
 .../common/ExamplePubsubTopicOptions.java   |   2 +-
 .../beam/examples/complete/AutoComplete.java|   2 +-
 .../apache/beam/examples/complete/TfIdf.java|   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/BigQueryTornadoes.java|   2 +-
 .../cookbook/CombinePerKeyExamples.java |   2 +-
 .../beam/examples/cookbook/DeDupExample.java|   4 +-
 .../beam/examples/cookbook/FilterExamples.java  |   2 +-
 .../beam/examples/cookbook/JoinExamples.java|   2 +-
 .../examples/cookbook/MaxPerKeyExamples.java|   2 +-
 .../beam/examples/WindowedWordCountIT.java  |  11 +
 .../examples/cookbook/BigQueryTornadoesIT.java  |  14 +-
 examples/java8/pom.xml  |   2 +-
 .../beam/examples/complete/game/GameStats.java  |   2 +-
 .../examples/complete/game/HourlyTeamScore.java |   2 +-
 .../examples/complete/game/LeaderBoard.java |   2 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 examples/pom.xml|   2 +-
 pom.xml |  17 +-
 runners/core-java/pom.xml   |   2 +-
 .../beam/runners/core/AggregatorFactory.java|  39 ++
 .../apache/beam/runners/core/DoFnRunner.java|   6 +-
 .../apache/beam/runners/core/DoFnRunners.java   | 191 --
 .../runners/core/ElementByteSizeObservable.java |   5 +-
 .../runners/core/GBKIntoKeyedWorkItems.java |  17 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   5 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   5 +
 .../runners/core/ReduceFnContextFactory.java|   2 +-
 .../beam/runners/core/ReduceFnRunner.java   |  39 +-
 .../beam/runners/core/SimpleDoFnRunner.java | 337 ++
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 521 +++
 .../apache/beam/runners/core/TriggerRunner.java | 247 ---
 .../core/UnboundedReadFromBoundedSource.java|   2 +-
 .../core/triggers/AfterAllStateMachine.java |   5 +
 .../AfterDelayFromFirstElementStateMachine.java |  17 +-
 .../core/triggers/AfterEachStateMachine.java|   5 +
 .../core/triggers/AfterFirstStateMachine.java   |   6 +
 .../core/triggers/AfterPaneStateMachine.java|   7 +
 .../triggers/AfterWatermarkStateMachine.java|   4 +-
 .../runners/core/triggers/FinishedTriggers.java |   8 +-
 .../core/triggers/TriggerStateMachine.java  |   4 +-
 .../triggers/TriggerStateMachineRunner.java |   2 +-
 .../core/triggers/TriggerStateMachines.java | 215 +++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 281 
 .../beam/runners/core/ReduceFnTester.java   | 157 +++--
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  88 ---
 .../runners/core/SimpleOldDoFnRunnerTest.java   |  88 +++
 .../beam/runners/core/SplittableParDoTest.java  |  14 +-
 .../core/triggers/TriggerStateMachinesTest.java | 199 ++
 runners/direct-java/pom.xml |  18 +-
 .../direct/BoundedReadEvaluatorFactory.java |   3 +-
 .../beam/runners/direct/DirectMetrics.java  |   7 +
 .../beam/runners/direct/DirectRunner.java   |   2 +-
 .../runners/direct/DoFnLifecycleManager.java|  42 +-
 .../runners/direct/ForwardingPTransform.java|   2 +-
 .../beam/runners/direct/ParDoEvaluator.java |   4 +-
 .../direct/ParDoMultiEvaluatorFactory.java  |   6 +-
 .../direct/ParDoSingleEvaluatorFactory.java |   5 +-
 .../direct/TestStreamEvaluatorFactory.java  |   2 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |  15 +-
 .../beam/runners/direct/WatermarkManager.java   |   4 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  32 -
 ...leManagerRemovingTransformEvaluatorTest.java |  16 +-
 .../direct/DoFnLifecycleManagerTest.java|  12 +-
 .../direct/DoFnLifecycleManagersTest.java   |  48 +-
 .../direct/ForwardingPTransformTest.java|   7 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  11 +
 .../direct/ParDoSingleEvalua

[27/39] incubator-beam git commit: Closes #1269

2016-11-11 Thread kenn
Closes #1269


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15287b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15287b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15287b83

Branch: refs/heads/master
Commit: 15287b83860d49e595d59d3e178cc3fecef4c310
Parents: 51af7e5 a21550f
Author: Thomas Weise 
Authored: Mon Nov 7 10:59:22 2016 -0800
Committer: Thomas Weise 
Committed: Mon Nov 7 10:59:22 2016 -0800

--
 runners/apex/README.md  |  76 
 runners/apex/pom.xml|   1 +
 .../apache/beam/runners/apex/ApexRunner.java|  82 
 .../functions/ApexGroupByKeyOperator.java   |   2 +-
 .../io/ApexReadUnboundedInputOperator.java  |   2 +-
 .../apex/translators/utils/ApexStreamTuple.java |  21 ++-
 .../apex/examples/StreamingWordCountTest.java   | 121 
 .../runners/apex/examples/WordCountTest.java| 188 +++
 .../translators/ApexGroupByKeyOperatorTest.java | 112 +++
 .../translators/GroupByKeyTranslatorTest.java   |   1 +
 runners/apex/src/test/resources/words.txt   |   3 +
 11 files changed, 484 insertions(+), 125 deletions(-)
--




[19/39] incubator-beam git commit: BEAM-261 Skip integration tests unless running with -Plocal-runnable-on-service-tests

2016-11-11 Thread kenn
BEAM-261 Skip integration tests unless running with 
-Plocal-runnable-on-service-tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52d233aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52d233aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52d233aa

Branch: refs/heads/master
Commit: 52d233aa10dd71ba5f396d0ac76a29f9a3e30f5b
Parents: 8827ccf
Author: Thomas Weise 
Authored: Wed Oct 26 12:19:25 2016 -0700
Committer: Thomas Weise 
Committed: Wed Oct 26 12:19:25 2016 -0700

--
 runners/apex/pom.xml | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52d233aa/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 1d52af7..6ccc0da 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -37,11 +37,21 @@
   
 3.5.0-SNAPSHOT
 3.4.0
-false
+true
 
 -Xmx2048m
   
 
+  
+
+  local-runnable-on-service-tests
+  false
+  
+false
+  
+
+  
+
   
 
 



[28/39] incubator-beam git commit: Merge branch 'master' into apex-runner

2016-11-11 Thread kenn
Merge branch 'master' into apex-runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ccd5fb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ccd5fb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ccd5fb3

Branch: refs/heads/master
Commit: 5ccd5fb3c62ebdf1310af4af17cab2270b4a0d96
Parents: 15287b8 1102455
Author: Thomas Weise 
Authored: Mon Nov 7 20:12:36 2016 +0100
Committer: Thomas Weise 
Committed: Mon Nov 7 20:12:36 2016 +0100

--
 .travis.yml |   6 +-
 examples/java/pom.xml   |  79 --
 .../beam/examples/common/ExampleUtils.java  |   3 +-
 .../apache/beam/examples/complete/TfIdf.java|   6 +-
 .../beam/examples/cookbook/DeDupExample.java|   8 +-
 .../org/apache/beam/examples/cookbook/README.md |   2 +-
 .../beam/examples/WindowedWordCountIT.java  |  11 +-
 .../org/apache/beam/examples/WordCountIT.java   |  19 +-
 .../beam/examples/complete/TfIdfTest.java   |   4 +-
 .../examples/cookbook/BigQueryTornadoesIT.java  |  11 +-
 .../examples/cookbook/DeDupExampleTest.java |  82 --
 .../examples/cookbook/DistinctExampleTest.java  |  82 ++
 examples/java8/pom.xml  |  19 -
 .../beam/examples/complete/game/GameStats.java  |   3 +-
 .../examples/complete/game/LeaderBoard.java |   2 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 .../examples/MinimalWordCountJava8Test.java |   2 +-
 examples/pom.xml|  13 +-
 pom.xml | 107 ++-
 runners/core-java/pom.xml   |  65 --
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   3 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   3 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java   |  13 +-
 .../core/PushbackSideInputDoFnRunner.java   |  28 +-
 .../beam/runners/core/ReduceFnTester.java   |   6 +-
 .../UnboundedReadFromBoundedSourceTest.java |   4 +-
 runners/direct-java/pom.xml | 102 ---
 .../runners/direct/AggregatorContainer.java |  20 +-
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  16 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  42 +-
 .../direct/DirectGroupByKeyOverrideFactory.java |  25 +-
 .../runners/direct/DirectTimerInternals.java|  13 +
 .../beam/runners/direct/EvaluationContext.java  |  18 +-
 .../direct/ExecutorServiceParallelExecutor.java |  41 +-
 .../GroupAlsoByWindowEvaluatorFactory.java  | 249 --
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  31 +-
 .../direct/PTransformOverrideFactory.java   |   8 +-
 .../beam/runners/direct/ParDoEvaluator.java |  28 +-
 .../runners/direct/ParDoEvaluatorFactory.java   | 126 
 .../direct/ParDoMultiEvaluatorFactory.java  | 107 ---
 .../direct/ParDoMultiEvaluatorHooks.java|  55 ++
 .../runners/direct/ParDoOverrideFactory.java|  14 +-
 .../direct/ParDoSingleEvaluatorFactory.java | 110 ---
 .../direct/ParDoSingleEvaluatorHooks.java   |  58 ++
 .../direct/TestStreamEvaluatorFactory.java  |  14 +-
 .../direct/TransformEvaluatorRegistry.java  |  10 +-
 .../direct/UncommittedBundleOutputManager.java  |  50 --
 .../runners/direct/ViewEvaluatorFactory.java|  19 +-
 .../direct/WatermarkCallbackExecutor.java   |   3 +-
 .../beam/runners/direct/WatermarkManager.java   |  79 +-
 .../direct/WriteWithShardingFactory.java|  15 +-
 .../runners/direct/EvaluationContextTest.java   |  23 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |  29 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java |  31 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  16 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 439 ---
 .../direct/ParDoMultiEvaluatorHooksTest.java| 439 +++
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 335 -
 .../direct/ParDoSingleEvaluatorHooksTest.java   | 335 +
 .../runners/direct/WatermarkManagerTest.java| 102 +--
 .../direct/WriteWithShardingFactoryTest.java|  26 +-
 runners/flink/examples/pom.xml  |  17 -
 .../beam/runners/flink/examples/TFIDF.java  |   6 +-
 .../flink/examples/streaming/AutoComplete.java  |   8 +-
 .../flink/examples/streaming/JoinExamples.java  |  12 +-
 .../examples/streaming/KafkaIOExamples.java |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   8 +-
 runners/flink/runner/pom.xml|  17 -
 .../wrappers/streaming/DoFnOperator.java|  10 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |  13 +
 runners/google-cloud-dataflow-java/pom.xml  | 114 ---
 .../dataflow/BlockingDataflowRunner.java| 170 -
 .../dataflow/DataflowJobCancelledException.java |  39 -
 .../dataflow/DataflowJobExecutionException.java |  35 -
 .../dataflow/DataflowJobUpdate

[20/39] incubator-beam git commit: Closes #1186

2016-11-11 Thread kenn
Closes #1186


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e92157b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e92157b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e92157b3

Branch: refs/heads/master
Commit: e92157b37fefa0931a63191e24b06fd8df2f7a32
Parents: 8827ccf 1db4ff6
Author: Thomas Weise 
Authored: Wed Oct 26 13:21:18 2016 -0700
Committer: Thomas Weise 
Committed: Wed Oct 26 13:21:18 2016 -0700

--
 .../apex/translators/GroupByKeyTranslator.java  |   3 +-
 .../translators/ParDoBoundMultiTranslator.java  |   4 +-
 .../apex/translators/ParDoBoundTranslator.java  |   4 +-
 .../apex/translators/TranslationContext.java|  10 +
 .../functions/ApexGroupByKeyOperator.java   |  12 +-
 .../functions/ApexParDoOperator.java|  11 +-
 .../translators/utils/ApexStateInternals.java   | 438 +++
 .../translators/ParDoBoundTranslatorTest.java   |  62 ++-
 .../utils/ApexStateInternalsTest.java   | 361 +++
 9 files changed, 883 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
--

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
--
diff --cc 
runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 72b4299,9ea4233..6f50398
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@@ -198,99 -200,51 +208,133 @@@ public class ParDoBoundTranslatorTest 
  .apply(Sum.integersGlobally().asSingletonView());
  
  ApexParDoOperator operator = new 
ApexParDoOperator<>(options,
- new Add(0), new TupleTag(), TupleTagList.empty().getAll(),
+ new Add(singletonView), new TupleTag(), 
TupleTagList.empty().getAll(),
  WindowingStrategy.globalDefault(),
  Collections.>singletonList(singletonView),
- coder);
+ coder,
+ new ApexStateInternals.ApexStateInternalsFactory()
+ );
  operator.setup(null);
  operator.beginWindow(0);
- WindowedValue wv = WindowedValue.valueInGlobalWindow(0);
- operator.input.process(ApexStreamTuple.DataTuple.of(wv));
- 
operator.input.process(ApexStreamTuple.WatermarkTuple.>of(0));
- operator.endWindow();
- Assert.assertNotNull("Serialization", 
KryoCloneUtils.cloneObject(operator));
+ WindowedValue wv1 = WindowedValue.valueInGlobalWindow(1);
+ WindowedValue> sideInput = 
WindowedValue.>valueInGlobalWindow(
+ Lists.newArrayList(22));
+ operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back 
input
  
+ final List results = Lists.newArrayList();
+ Sink sink =  new Sink() {
+   @Override
+   public void put(Object tuple) {
+ results.add(tuple);
+   }
+   @Override
+   public int getCount(boolean reset) {
+ return 0;
+   }
+ };
+ 
+ // verify pushed back input checkpointing
+ Assert.assertNotNull("Serialization", operator = 
KryoCloneUtils.cloneObject(operator));
+ operator.output.setSink(sink);
+ operator.setup(null);
+ operator.beginWindow(1);
+ WindowedValue wv2 = WindowedValue.valueInGlobalWindow(2);
+ operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput));
+ Assert.assertEquals("number outputs", 1, results.size());
+ Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23),
+ ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
+ 
+ // verify side input checkpointing
+ results.clear();
+ Assert.assertNotNull("Serialization", operator = 
KryoCloneUtils.cloneObject(operator));
+ operator.output.setSink(sink);
+ operator.setup(null);
+ operator.beginWindow(2);
+ operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+ Assert.assertEquals("number outputs", 1, results.size());
+ Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24),
+ ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
}
 +
 +  @Test
 +  public void testMultiOutputParDoWithSideInputs() throws Exception {
 +ApexPipelineOptions options = 
PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
 +options.setRunner(ApexRunner.class); // non-blocking run
 +Pipeline pipeline = Pipeline.c

[36/39] incubator-beam git commit: Fix findbugs issues.

2016-11-11 Thread kenn
Fix findbugs issues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99001575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99001575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99001575

Branch: refs/heads/master
Commit: 99001575d266798cb5537c8a025735a095ac535e
Parents: c08ebbe
Author: Thomas Weise 
Authored: Tue Nov 8 05:02:26 2016 +0100
Committer: Thomas Weise 
Committed: Tue Nov 8 09:22:13 2016 +0100

--
 .../src/main/java/org/apache/beam/runners/apex/ApexRunner.java  | 5 +++--
 .../java/org/apache/beam/runners/apex/ApexRunnerResult.java | 4 ++--
 .../apex/translation/operators/ApexGroupByKeyOperator.java  | 5 +
 .../runners/apex/translation/operators/ApexParDoOperator.java   | 2 +-
 .../runners/apex/translation/ApexGroupByKeyOperatorTest.java| 5 +
 5 files changed, 12 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index b42dddf..5ce4fef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -28,6 +28,7 @@ import com.google.common.base.Throwables;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
 import org.apache.beam.runners.core.AssignWindows;
@@ -73,7 +74,7 @@ public class ApexRunner extends 
PipelineRunner {
* Holds any most resent assertion error that was raised while processing 
elements.
* Used in the unit test driver in embedded mode to propagate the exception.
*/
-  public static volatile AssertionError assertionError;
+  public static final AtomicReference ASSERTION_ERROR = new 
AtomicReference<>();
 
   public ApexRunner(ApexPipelineOptions options) {
 this.options = options;
@@ -141,7 +142,7 @@ public class ApexRunner extends 
PipelineRunner {
 // turns off timeout checking for operator progress
 lc.setHeartbeatMonitoringEnabled(false);
   }
-  assertionError = null;
+  ApexRunner.ASSERTION_ERROR.set(null);
   lc.runAsync();
   return new ApexRunnerResult(lma.getDAG(), lc);
 } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 3ae69f2..18b50bc 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -95,8 +95,8 @@ public class ApexRunnerResult implements PipelineResult {
   appDoneField = ctrl.getClass().getDeclaredField("appDone");
   appDoneField.setAccessible(true);
   while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < 
timeout) {
-if (ApexRunner.assertionError != null) {
-  throw ApexRunner.assertionError;
+if (ApexRunner.ASSERTION_ERROR.get() != null) {
+  throw ApexRunner.ASSERTION_ERROR.get();
 }
 Thread.sleep(500);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1b5e693..8fbfb03 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -32,7 +32,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -467,9 +466,7 @@ public class ApexGroupByKeyOperator implements 
Operato

[21/39] incubator-beam git commit: Closes #1194

2016-11-11 Thread kenn
Closes #1194


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa3a6aa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa3a6aa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa3a6aa8

Branch: refs/heads/master
Commit: fa3a6aa8d88b24d44d4ca5968e1f26efb4c9dea0
Parents: e92157b 52d233a
Author: Thomas Weise 
Authored: Wed Oct 26 15:14:10 2016 -0700
Committer: Thomas Weise 
Committed: Wed Oct 26 15:14:10 2016 -0700

--
 runners/apex/pom.xml | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)
--




[18/39] incubator-beam git commit: Closes #1167

2016-11-11 Thread kenn
Closes #1167


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8827ccf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8827ccf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8827ccf6

Branch: refs/heads/master
Commit: 8827ccf6b6d648793065df1d5b2c6997f0e6be1e
Parents: 989e399 c8f8a80
Author: Thomas Weise 
Authored: Tue Oct 25 16:32:20 2016 -0700
Committer: Thomas Weise 
Committed: Tue Oct 25 16:32:20 2016 -0700

--
 runners/apex/pom.xml|  6 +-
 .../apache/beam/runners/apex/ApexRunner.java|  5 +-
 .../beam/runners/apex/ApexRunnerRegistrar.java  | 61 
 .../runners/apex/ApexRunnerRegistrarTest.java   | 47 +++
 4 files changed, 115 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8827ccf6/runners/apex/pom.xml
--



[30/39] incubator-beam git commit: BEAM-261 Make translators package private.

2016-11-11 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
--
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
deleted file mode 100644
index 2379a9e..000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Sink;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * integration test for {@link ParDoBoundTranslator}.
- */
-@RunWith(JUnit4.class)
-public class ParDoBoundTranslatorTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
-  private static final long SLEEP_MILLIS = 500;
-  private static final long TIMEOUT_MILLIS = 3;
-
-  @Test
-  public void test() throws Exception {
-ApexPipelineOptions options = PipelineOptionsFactory.create()
-.as(ApexPipelineOptions.class);
-options.setApplicationName("ParDoBound");
-options.setRunner(ApexRunner.class);
-
-Pipeline p = Pipeline.create(options);
-
-List collection = Lists.newArrayList(1, 2, 3, 4, 5);
-List expected = Lists.newArrayList(6, 7, 8, 9, 10);
-
p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class)))
-.apply(ParDo.of(new Add(5)))
-.apply(ParDo.of(new EmbeddedCollector()));
-
-ApexRunnerResult result = (ApexRunnerResult) p.run();
-DAG dag = result.getApexDAG();
-
-DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
-Assert.assertNotNull(om);
-Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
-
-om = dag.getOperatorMeta("ParDo(Add)");
-Assert.assertNotNull(om);
-Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
-
-long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
-while (System.currentTimeMilli

[GitHub] incubator-beam pull request #1305: Merge apex-runner to master

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1305


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[05/39] incubator-beam git commit: BEAM-261 PCollectionView and side inputs.

2016-11-11 Thread kenn
BEAM-261 PCollectionView and side inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09754942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09754942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09754942

Branch: refs/heads/master
Commit: 09754942c66c9befffc8df9b3c8a75b819a672e6
Parents: 074b18f
Author: Thomas Weise 
Authored: Sun Sep 25 16:46:44 2016 -0700
Committer: Thomas Weise 
Committed: Sun Oct 16 23:25:55 2016 -0700

--
 .../beam/runners/apex/ApexPipelineOptions.java  |   6 +
 .../runners/apex/ApexPipelineTranslator.java|  19 +-
 .../apache/beam/runners/apex/ApexRunner.java| 397 ++-
 .../FlattenPCollectionTranslator.java   |  26 +-
 .../apex/translators/ParDoBoundTranslator.java  |  22 +-
 .../apex/translators/TranslationContext.java|  14 +-
 .../functions/ApexFlattenOperator.java  | 113 ++
 .../functions/ApexGroupByKeyOperator.java   |  78 +++-
 .../functions/ApexParDoOperator.java| 210 --
 .../io/ApexReadUnboundedInputOperator.java  |  31 +-
 .../apex/translators/utils/ApexStreamTuple.java |  11 +
 .../translators/utils/NoOpSideInputReader.java  |  47 ---
 .../beam/runners/apex/examples/IntTest.java | 133 +++
 .../beam/runners/apex/examples/IntTests.java| 207 --
 .../translators/ParDoBoundTranslatorTest.java   |  37 +-
 .../apex/src/test/resources/log4j.properties|   4 +-
 16 files changed, 1028 insertions(+), 327 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index f70d24c..141a8c1 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -50,6 +50,12 @@ public interface ApexPipelineOptions extends 
PipelineOptions, java.io.Serializab
   @Default.Boolean(true)
   boolean isEmbeddedExecutionDebugMode();
 
+  @Description("output data received and emitted on ports (for debugging)")
+  void setTupleTracingEnabled(boolean enabled);
+
+  @Default.Boolean(false)
+  boolean isTupleTracingEnabled();
+
   @Description("how long the client should wait for the pipeline to run")
   void setRunMillis(long runMillis);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index b0391b4..ad8c283 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.apex;
 
+import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
@@ -35,8 +36,8 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 registerTransformTranslator(Flatten.FlattenPCollectionList.class,
 new FlattenPCollectionTranslator());
 registerTransformTranslator(Create.Values.class, new 
CreateValuesTranslator());
+registerTransformTranslator(CreateApexPCollectionView.class, new 
CreatePCollectionViewTranslator());
   }
 
   public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -98,7 +100,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 PTransform transform = node.getTransform();
 TransformTranslator translator = 
getTransformTranslator(transform.getClass());
 if (null == translator) {
-  throw new IllegalStateException(
+  throw new UnsupportedOperationExceptio

[02/39] incubator-beam git commit: BEAM-261 Apex runner PoC

2016-11-11 Thread kenn
BEAM-261 Apex runner PoC


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaf38ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaf38ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaf38ddf

Branch: refs/heads/master
Commit: aaf38ddfe53bbb67fad4456ee1068d18b9b891b5
Parents: 49f9444
Author: Thomas Weise 
Authored: Mon Jun 27 11:24:13 2016 -0700
Committer: Thomas Weise 
Committed: Sun Oct 16 23:21:55 2016 -0700

--
 runners/apex/pom.xml| 226 ++
 .../beam/runners/apex/ApexPipelineOptions.java  |  60 +++
 .../runners/apex/ApexPipelineTranslator.java| 134 ++
 .../apache/beam/runners/apex/ApexRunner.java| 171 
 .../beam/runners/apex/ApexRunnerResult.java |  85 
 .../beam/runners/apex/TestApexRunner.java   |  56 +++
 .../translators/CreateValuesTranslator.java |  49 +++
 .../FlattenPCollectionTranslator.java   |  52 +++
 .../apex/translators/GroupByKeyTranslator.java  |  41 ++
 .../apex/translators/ParDoBoundTranslator.java  |  43 ++
 .../translators/ReadUnboundedTranslator.java|  42 ++
 .../apex/translators/TransformTranslator.java   |  31 ++
 .../apex/translators/TranslationContext.java| 143 +++
 .../functions/ApexGroupByKeyOperator.java   | 427 +++
 .../functions/ApexParDoOperator.java| 177 
 .../io/ApexReadUnboundedInputOperator.java  | 125 ++
 .../apex/translators/io/ValuesSource.java   | 152 +++
 .../apex/translators/utils/ApexStreamTuple.java | 191 +
 .../utils/CoderAdapterStreamCodec.java  |  73 
 .../translators/utils/NoOpSideInputReader.java  |  47 ++
 .../apex/translators/utils/NoOpStepContext.java |  73 
 .../utils/SerializablePipelineOptions.java  |  61 +++
 .../apex/examples/StreamingWordCountTest.java   | 120 ++
 .../apex/examples/UnboundedTextSource.java  | 144 +++
 .../FlattenPCollectionTranslatorTest.java   |  97 +
 .../translators/GroupByKeyTranslatorTest.java   | 248 +++
 .../translators/ParDoBoundTranslatorTest.java   | 164 +++
 .../translators/ReadUnboundTranslatorTest.java  | 130 ++
 .../translators/utils/CollectionSource.java | 137 ++
 .../translators/utils/PipelineOptionsTest.java  |  82 
 .../apex/src/test/resources/log4j.properties|  33 ++
 runners/pom.xml |   1 +
 32 files changed, 3615 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
new file mode 100644
index 000..bb08b3c
--- /dev/null
+++ b/runners/apex/pom.xml
@@ -0,0 +1,226 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  4.0.0
+
+  
+org.apache.beam
+beam-runners-parent
+0.3.0-incubating-SNAPSHOT
+../pom.xml
+  
+
+  beam-runners-apex_3.4.0
+
+  Apache Beam :: Runners :: Apex
+
+  jar
+
+  
+3.4.0
+3.4.0
+true
+
+-Xmx2048m
+  
+
+  
+
+
+  org.apache.apex
+  apex-common
+  ${apex.core.version}
+
+
+  org.apache.apex
+  malhar-library
+  ${apex.malhar.version}
+
+
+   com.fasterxml.jackson.core
+   jackson-databind
+
+
+  org.apache.apex
+  apex-engine
+  ${apex.core.version}
+  test
+
+
+
+
+  org.apache.beam
+  beam-sdks-java-core
+  
+
+  org.slf4j
+  slf4j-jdk14
+
+  
+
+
+
+  org.apache.beam
+  beam-runners-core-java
+  
+
+  org.slf4j
+  slf4j-jdk14
+
+  
+
+
+
+   
+   com.google.code.findbugs
+   annotations
+
+
+
+
+
+  org.hamcrest
+  hamcrest-all
+  test
+
+
+  junit
+  junit
+  test
+
+
+  org.mockito
+  mockito-all
+  test
+
+
+
+
+  org.apache.beam
+  beam-sdks-java-core
+  tests
+  test
+  
+
+  org.slf4j
+  slf4j-jdk14
+
+  
+
+
+
+
+
+  
+
+  
+
+
+  
+
+  
+  
+org.apache.maven.plugins
+maven-failsafe-plugin
+  
+
+  
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  ${surefire.args}
+
+
+  
+runnable-on-service-tests
+integration-test
+
+  test
+
+
+

[15/39] incubator-beam git commit: BEAM-784 Checkpointing for StateInternals

2016-11-11 Thread kenn
BEAM-784 Checkpointing for StateInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1db4ff63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1db4ff63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1db4ff63

Branch: refs/heads/master
Commit: 1db4ff631736172882976c33316bc089d58483af
Parents: 0a1b278
Author: Thomas Weise 
Authored: Tue Oct 25 08:32:23 2016 -0700
Committer: Thomas Weise 
Committed: Tue Oct 25 10:06:12 2016 -0700

--
 .../apex/translators/GroupByKeyTranslator.java  |   3 +-
 .../translators/ParDoBoundMultiTranslator.java  |   4 +-
 .../apex/translators/ParDoBoundTranslator.java  |   4 +-
 .../apex/translators/TranslationContext.java|  10 +
 .../functions/ApexGroupByKeyOperator.java   |  12 +-
 .../functions/ApexParDoOperator.java|  11 +-
 .../translators/utils/ApexStateInternals.java   | 438 +++
 .../translators/ParDoBoundTranslatorTest.java   |  62 ++-
 .../utils/ApexStateInternalsTest.java   | 361 +++
 9 files changed, 883 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1db4ff63/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
index d3e7d2d..cb78579 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
@@ -33,7 +33,8 @@ public class GroupByKeyTranslator implements 
TransformTranslator transform, TranslationContext 
context) {
 PCollection> input = context.getInput();
 ApexGroupByKeyOperator group = new 
ApexGroupByKeyOperator<>(context.getPipelineOptions(),
-input);
+input, context.stateInternalsFactory()
+);
 context.addOperator(group, group.output);
 context.addStream(input, group.input);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1db4ff63/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 13f07c1..2678869 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -64,7 +64,9 @@ public class ParDoBoundMultiTranslator
 ApexParDoOperator operator = new ApexParDoOperator<>(
 context.getPipelineOptions(),
 doFn, transform.getMainOutputTag(), 
transform.getSideOutputTags().getAll(),
-context.>getInput().getWindowingStrategy(), sideInputs, 
wvInputCoder);
+context.>getInput().getWindowingStrategy(), sideInputs, 
wvInputCoder,
+context.stateInternalsFactory()
+);
 
 Map, PCollection> outputs = output.getAll();
 Map, OutputPort> ports = 
Maps.newHashMapWithExpectedSize(outputs.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1db4ff63/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index bd7115e..92567a6 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -52,7 +52,9 @@ public class ParDoBoundTranslator implements
 ApexParDoOperator operator = new ApexParDoOperator<>(
 context.getPipelineOptions(),
 doFn, new TupleTag(), TupleTagList.empty().getAll() 
/*sideOutputTags*/,
-output.getWindowingStrategy(), sideInputs, wvInputCoder);
+output.getWindowingStrategy(), sideInputs, wvInputCoder,
+context.stateInternalsFactory()
+);
 context.addOperator(operator, operator.output);
 context.addStream(context.getInput(), operator.input);
 if (!sideInputs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incu

[31/39] incubator-beam git commit: BEAM-261 Make translators package private.

2016-11-11 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
deleted file mode 100644
index d32b869..000
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A wrapper to enable serialization of {@link PipelineOptions}.
- */
-public class SerializablePipelineOptions implements Externalizable {
-
-  private transient ApexPipelineOptions pipelineOptions;
-
-  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
-this.pipelineOptions = pipelineOptions;
-  }
-
-  public SerializablePipelineOptions() {
-  }
-
-  public ApexPipelineOptions get() {
-return this.pipelineOptions;
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-String s = in.readUTF();
-this.pipelineOptions = new ObjectMapper().readValue(s, 
PipelineOptions.class)
-.as(ApexPipelineOptions.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
deleted file mode 100644
index c06c500..000
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
-
-/**
- * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
- * @param 
- */
-public class ValueAndCoderKryoSerializable implements KryoSerializable {
-  private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
-  private T value;
-  private Coder coder;
-
-  public ValueAndCoderKryoS

[08/39] incubator-beam git commit: BEAM-261 Enable checkstyle and cleanup.

2016-11-11 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
new file mode 100644
index 000..ecb0adb
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.functions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 6ee82ea..3188dfa 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -18,6 +18,17 @@
 
 package org.apache.beam.runners.apex.translators.io;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import 
org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
@@ -26,27 +37,15 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.google.common.base.Throwables;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.IOException;
-
 /**
  * Apex input operator that wraps Beam {@link UnboundedSource}.
  */
-public class ApexReadUnboundedInputOperator
-implements InputOperator {
+public class ApexReadUnboundedInputOperator implements InputOperator {
   private static final Logger LOG = LoggerFactory.getLogger(
   ApexReadUnboundedInputOperator.class);
   private boolean traceTuples = false;
@@ -58,10 +57,12 @@ public class ApexReadUnboundedInputOperator source;
   private transient UnboundedSource.UnboundedReader reader;
   private transient boolean available = false;
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient 
DefaultOutputPort>> output = new 
DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient 
DefaultOutputPort>> output =
+  new DefaultOutputPort<>();
 
-  public ApexReadUnboundedInputOperator(UnboundedSource source, ApexPipelineOptions options) {
+  public ApexReadUnboundedInputOperator(UnboundedSource source,
+  ApexPipelineOptions options) {
 this.pipelineOptions = new SerializablePipelineOptions(options);
 this.source = source;
   }
@@ -72,8 +73,7 

[37/39] incubator-beam git commit: Merge branch 'master' of https://github.com/apache/incubator-beam into apex-runner

2016-11-11 Thread kenn
Merge branch 'master' of https://github.com/apache/incubator-beam into 
apex-runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/320d6193
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/320d6193
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/320d6193

Branch: refs/heads/master
Commit: 320d6193d8cd41aa99dce01aa8f8aabac30064fa
Parents: 9900157 49ce270
Author: Thomas Weise 
Authored: Fri Nov 11 21:33:38 2016 +0100
Committer: Thomas Weise 
Committed: Fri Nov 11 21:33:38 2016 +0100

--
 examples/java/pom.xml   |  87 +-
 .../beam/examples/DebuggingWordCount.java   |  67 +-
 .../apache/beam/examples/MinimalWordCount.java  |  50 +-
 .../org/apache/beam/examples/WordCount.java |  62 +-
 .../beam/examples/DebuggingWordCountTest.java   |  15 +-
 .../beam/examples/WindowedWordCountIT.java  |  30 +-
 .../org/apache/beam/examples/WordCountIT.java   |  17 +-
 .../examples/cookbook/BigQueryTornadoesIT.java  |  16 +-
 examples/java8/pom.xml  |  13 +
 .../beam/examples/complete/game/GameStats.java  |   8 +-
 .../examples/complete/game/HourlyTeamScore.java |   8 +-
 .../examples/complete/game/LeaderBoard.java |  10 +-
 .../beam/examples/complete/game/UserScore.java  |   6 +-
 .../examples/complete/game/GameStatsTest.java   |   5 +
 .../complete/game/HourlyTeamScoreTest.java  |   5 +
 .../examples/complete/game/LeaderBoardTest.java |   6 +
 examples/pom.xml|   5 +
 pom.xml |  34 +
 runners/apex/README.md  |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java |  16 +-
 .../beam/runners/core/SplittableParDo.java  |  25 +-
 .../core/UnboundedReadFromBoundedSource.java|   3 +-
 .../core/triggers/TriggerStateMachines.java |  16 +-
 runners/direct-java/pom.xml |  13 +
 .../beam/runners/direct/DirectRunner.java   | 119 ++-
 .../runners/direct/DoFnLifecycleManager.java|   4 +-
 .../direct/EncodabilityEnforcementFactory.java  |  80 --
 .../ImmutabilityCheckingBundleFactory.java  |  11 +-
 .../direct/ImmutabilityEnforcementFactory.java  |   2 -
 .../direct/ImmutableListBundleFactory.java  |  21 +-
 .../runners/direct/ParDoOverrideFactory.java|   2 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |   6 +-
 .../beam/runners/direct/WatermarkManager.java   | 152 ++--
 .../direct/CloningBundleFactoryTest.java| 122 ++-
 .../ConsumerTrackingPipelineVisitorTest.java|  17 +
 .../beam/runners/direct/DirectRunnerTest.java   |  26 +
 .../EncodabilityEnforcementFactoryTest.java | 323 
 .../direct/ImmutableListBundleFactoryTest.java  |  15 +-
 .../UnboundedReadEvaluatorFactoryTest.java  |  47 +-
 .../runners/direct/WatermarkManagerTest.java|  83 ++
 .../beam/runners/flink/examples/TFIDF.java  |  11 +-
 runners/flink/pom.xml   |   1 +
 .../apache/beam/runners/flink/FlinkRunner.java  |   4 +-
 .../FlinkStreamingTransformTranslators.java |   4 -
 .../SerializableFnAggregatorWrapper.java|   7 +
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../streaming/io/UnboundedSocketSource.java |   5 +-
 .../streaming/io/UnboundedSourceWrapper.java|   3 +-
 runners/google-cloud-dataflow-java/pom.xml  |  28 +-
 .../dataflow/testing/TestDataflowRunner.java|  20 +-
 .../dataflow/util/DataflowTransport.java|   9 +-
 runners/pom.xml |   5 +
 runners/spark/pom.xml   |   9 +
 .../beam/runners/spark/SparkContextOptions.java |  64 ++
 .../runners/spark/SparkPipelineOptions.java |  36 +-
 .../spark/translation/SparkContextFactory.java  |  19 +-
 .../SparkRunnerStreamingContextFactory.java |   3 +-
 .../runners/spark/ProvidedSparkContextTest.java |   6 +-
 .../streaming/KafkaStreamingTest.java   |   4 +-
 sdks/java/core/pom.xml  |  31 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 126 +--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 134 +--
 .../org/apache/beam/sdk/options/GcpOptions.java | 136 +--
 .../beam/sdk/testing/BigqueryMatcher.java   |  15 +-
 .../beam/sdk/testing/TestPipelineOptions.java   |   6 +
 .../org/apache/beam/sdk/transforms/DoFn.java|  20 +
 .../beam/sdk/transforms/DoFnAdapters.java   |  28 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  47 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java| 828 +++
 .../reflect/ByteBuddyOnTimerInvokerFactory.java | 279 +++
 .../transforms/reflect/DoFnInvokerFactory.java  |  27 +
 .../sdk/transforms/reflect/DoFnInvokers.java| 658 +--
 .../sdk/transforms/reflect/DoFnSignature.java   |  67 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  

[12/39] incubator-beam git commit: BEAM-783 Add test to cover side inputs and outputs.

2016-11-11 Thread kenn
BEAM-783 Add test to cover side inputs and outputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7105d925
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7105d925
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7105d925

Branch: refs/heads/master
Commit: 7105d925c51a49798849f01f1d7e0b4f3d4f51ad
Parents: c9f1406
Author: Thomas Weise 
Authored: Wed Oct 19 19:11:54 2016 -0700
Committer: Thomas Weise 
Committed: Wed Oct 19 19:11:54 2016 -0700

--
 .../translators/ParDoBoundMultiTranslator.java  | 14 ++-
 .../functions/ApexFlattenOperator.java  |  3 +-
 .../translators/ParDoBoundTranslatorTest.java   | 96 +++-
 3 files changed, 107 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 13f07c1..9135dd8 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -68,9 +68,19 @@ public class ParDoBoundMultiTranslator
 
 Map, PCollection> outputs = output.getAll();
 Map, OutputPort> ports = 
Maps.newHashMapWithExpectedSize(outputs.size());
-int i = 0;
 for (Map.Entry, PCollection> outputEntry : 
outputs.entrySet()) {
-  ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]);
+  if (outputEntry.getKey() == transform.getMainOutputTag()) {
+ports.put(outputEntry.getValue(), operator.output);
+  } else {
+int portIndex = 0;
+for (TupleTag tag : transform.getSideOutputTags().getAll()) {
+  if (tag == outputEntry.getKey()) {
+ports.put(outputEntry.getValue(), 
operator.sideOutputPorts[portIndex]);
+break;
+  }
+  portIndex++;
+}
+  }
 }
 context.addOperator(operator, ports);
 context.addStream(context.getInput(), operator.input);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index dd8fcd1..703b1f4 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class ApexFlattenOperator extends BaseOperator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ApexFlattenOperator.class);
-  private boolean traceTuples = true;
+  private boolean traceTuples = false;
 
   private long inputWM1;
   private long inputWM2;
@@ -121,4 +121,5 @@ public class ApexFlattenOperator extends 
BaseOperator {
   @OutputPortFieldAnnotation(optional = true)
   public final transient 
DefaultOutputPort>> out =
 new DefaultOutputPort>>();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
--
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index ad22acd..72b4299 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -26,6 +26,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -48,9 +50,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.trans

[24/39] incubator-beam git commit: BEAM-858 Enable ApexRunner integration test in examples.

2016-11-11 Thread kenn
BEAM-858 Enable ApexRunner integration test in examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77f4ba2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77f4ba2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77f4ba2e

Branch: refs/heads/master
Commit: 77f4ba2eebf0c02dc95a8b90b4277a12638c4300
Parents: 968eb32
Author: Thomas Weise 
Authored: Fri Oct 28 23:47:42 2016 -0700
Committer: Thomas Weise 
Committed: Tue Nov 1 03:35:15 2016 +0100

--
 examples/java/pom.xml   | 31 
 runners/apex/pom.xml|  2 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |  6 
 .../runners/apex/ApexPipelineTranslator.java|  2 +-
 .../apache/beam/runners/apex/ApexRunner.java| 12 
 .../beam/runners/apex/ApexRunnerResult.java | 27 +++--
 .../beam/runners/apex/TestApexRunner.java   | 20 +++--
 .../io/ApexReadUnboundedInputOperator.java  | 13 ++--
 .../apex/examples/StreamingWordCountTest.java   |  4 +--
 .../translators/ParDoBoundTranslatorTest.java   |  6 ++--
 10 files changed, 91 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/examples/java/pom.xml
--
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index fc82ed4..6c1a16f 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -85,6 +85,14 @@
 
 
   org.apache.beam
+  beam-runners-apex
+  ${project.version}
+  runtime
+  true
+
+
+
+  org.apache.beam
   beam-runners-spark
   ${project.version}
   runtime
@@ -224,6 +232,29 @@
   
 
   
+  
+apex-runner-integration-tests
+
+  integration-test
+  verify
+
+
+  
+WordCountIT.java
+  
+  all
+  4
+  
+
+  [
+  "--project=apache-beam-testing",
+  "--tempRoot=gs://temp-storage-for-end-to-end-tests",
+  "--runner=org.apache.beam.runners.apex.TestApexRunner"
+  ]
+
+  
+
+  
 
   
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 6ccc0da..d4bcc3d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -72,7 +72,7 @@
   org.apache.apex
   apex-engine
   ${apex.core.version}
-  test
+  runtime
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 141a8c1..54fdf76 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -32,12 +32,6 @@ public interface ApexPipelineOptions extends 
PipelineOptions, java.io.Serializab
 
   String getApplicationName();
 
-  @Description("set parallelism for Apex runner")
-  void setParallelism(int parallelism);
-
-  @Default.Integer(1)
-  int getParallelism();
-
   @Description("execute the pipeline with embedded cluster")
   void setEmbeddedExecution(boolean embedded);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a6857ee..8a87ce0 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -150,7 +150,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   BoundedToUnboundedSourceAdapter unboundedSource = new 
BoundedTo

[07/39] incubator-beam git commit: BEAM-261 Checkpointing for pushed back inputs.

2016-11-11 Thread kenn
BEAM-261 Checkpointing for pushed back inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd7f46c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd7f46c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd7f46c1

Branch: refs/heads/master
Commit: fd7f46c19b9c95a63b522793bb6fb8a849167cbc
Parents: 047cff4
Author: Thomas Weise 
Authored: Thu Oct 13 00:56:37 2016 -0700
Committer: Thomas Weise 
Committed: Mon Oct 17 09:22:16 2016 -0700

--
 .../apache/beam/runners/apex/ApexRunner.java|  3 +-
 .../translators/ParDoBoundMultiTranslator.java  | 10 ++-
 .../apex/translators/ParDoBoundTranslator.java  | 11 ++-
 .../functions/ApexParDoOperator.java| 32 +---
 .../utils/ValueAndCoderKryoSerializable.java| 81 
 .../translators/ParDoBoundTranslatorTest.java   | 42 ++
 6 files changed, 149 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index e2ebc29..ad49f08 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -72,7 +72,7 @@ public class ApexRunner extends 
PipelineRunner {
   private final ApexPipelineOptions options;
 
   /**
-   * TODO: this isn't thread sa
+   * TODO: this isn't thread safe and may cause issues when tests run in 
parallel
* Holds any most resent assertion error that was raised while processing 
elements.
* Used in the unit test driver in embedded to propagate the exception.
*/
@@ -89,7 +89,6 @@ public class ApexRunner extends 
PipelineRunner {
   @Override
   public  OutputT apply(
   PTransform transform, InputT input) {
-//System.out.println("transform: " + transform);
 
 if (Window.Bound.class.equals(transform.getClass())) {
   return (OutputT) ((PCollection) input).apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 6488bf6..9c5f2b5 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -22,8 +22,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -43,10 +46,15 @@ public class ParDoBoundMultiTranslator 
implements TransformTran
   public void translate(ParDo.BoundMulti transform, 
TranslationContext context) {
 OldDoFn doFn = transform.getFn();
 PCollectionTuple output = context.getOutput();
+PCollection input = context.getInput();
 List> sideInputs = transform.getSideInputs();
+Coder inputCoder = input.getCoder();
+WindowedValueCoder wvInputCoder = 
FullWindowedValueCoder.of(inputCoder,
+input.getWindowingStrategy().getWindowFn().windowCoder());
+
 ApexParDoOperator operator = new 
ApexParDoOperator<>(context.getPipelineOptions(),
 doFn, transform.getMainOutputTag(), 
transform.getSideOutputTags().getAll(),
-context.>getInput().getWindowingStrategy(), sideInputs);
+context.>getInput().getWindowingStrategy(), sideInputs, 
wvInputCoder);
 
 Map, PCollection> outputs = output.getAll();
 Map, OutputPort> ports = 
Maps.newHashMapWithExpectedSize(outputs.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/t

[04/39] incubator-beam git commit: BEAM-261 Read.Bounded and FlattenPCollection.

2016-11-11 Thread kenn
BEAM-261 Read.Bounded and FlattenPCollection.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/074b18f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/074b18f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/074b18f6

Branch: refs/heads/master
Commit: 074b18f6ae0cfc1a3cc986f89ded6a45e0a3eb57
Parents: a7e430d
Author: Thomas Weise 
Authored: Sun Sep 11 20:34:08 2016 -0700
Committer: Thomas Weise 
Committed: Sun Oct 16 23:25:28 2016 -0700

--
 runners/apex/pom.xml|   2 +-
 .../runners/apex/ApexPipelineTranslator.java|  16 ++
 .../apache/beam/runners/apex/ApexRunner.java|  10 +-
 .../FlattenPCollectionTranslator.java   |  53 -
 .../apex/translators/TranslationContext.java|  24 +--
 .../functions/ApexGroupByKeyOperator.java   |   6 +-
 .../functions/ApexParDoOperator.java|   6 +-
 .../beam/runners/apex/examples/IntTests.java| 207 +++
 .../translators/ReadUnboundTranslatorTest.java  |   2 +-
 9 files changed, 284 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 21e53a8..e9377b4 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -28,7 +28,7 @@
 ../pom.xml
   
 
-  beam-runners-apex_3.4.0
+  beam-runners-apex
 
   Apache Beam :: Runners :: Apex
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index 8ea7139..b0391b4 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -25,6 +25,8 @@ import 
org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
 import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.apex.translators.TransformTranslator;
 import org.apache.beam.runners.apex.translators.TranslationContext;
+import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -64,6 +66,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 // register TransformTranslators
 registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
 registerTransformTranslator(Read.Unbounded.class, new 
ReadUnboundedTranslator());
+registerTransformTranslator(Read.Bounded.class, new 
ReadBoundedTranslator());
 registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
 registerTransformTranslator(Flatten.FlattenPCollectionList.class,
 new FlattenPCollectionTranslator());
@@ -130,5 +133,18 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 return transformTranslators.get(transformClass);
   }
 
+  private static class ReadBoundedTranslator implements 
TransformTranslator> {
+private static final long serialVersionUID = 1L;
+
+@Override
+public void translate(Read.Bounded transform, TranslationContext 
context) {
+  // TODO: adapter is visibleForTesting
+  BoundedToUnboundedSourceAdapter unboundedSource = new 
BoundedToUnboundedSourceAdapter<>(transform.getSource());
+  ApexReadUnboundedInputOperator operator = new 
ApexReadUnboundedInputOperator<>(
+  unboundedSource, context.getPipelineOptions());
+  context.addOperator(operator, operator.output);
+}
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 87c8f97..5fa3f23 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.apex;
 import static com.google.common.base.Preconditions

[10/39] incubator-beam git commit: BEAM-261 Support multiple side inputs.

2016-11-11 Thread kenn
BEAM-261 Support multiple side inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ec7cd91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ec7cd91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ec7cd91

Branch: refs/heads/master
Commit: 1ec7cd9129fc31ece7554e2ea18535ce15e46bcf
Parents: fd7f46c
Author: Thomas Weise 
Authored: Thu Oct 13 14:38:06 2016 -0700
Committer: Thomas Weise 
Committed: Mon Oct 17 09:22:49 2016 -0700

--
 .../runners/apex/ApexPipelineTranslator.java| 19 ++-
 .../apache/beam/runners/apex/ApexRunner.java|  7 ++-
 .../beam/runners/apex/ApexRunnerResult.java |  7 +++
 .../FlattenPCollectionTranslator.java   | 38 +++---
 .../translators/ParDoBoundMultiTranslator.java  | 55 +---
 .../apex/translators/ParDoBoundTranslator.java  | 14 +
 .../functions/ApexFlattenOperator.java  | 11 
 .../functions/ApexParDoOperator.java| 13 +++--
 .../apex/translators/utils/ApexStreamTuple.java | 22 ++--
 9 files changed, 148 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index 40edfb1..a16f551 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
@@ -74,7 +75,8 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 registerTransformTranslator(Flatten.FlattenPCollectionList.class,
 new FlattenPCollectionTranslator());
 registerTransformTranslator(Create.Values.class, new 
CreateValuesTranslator());
-registerTransformTranslator(CreateApexPCollectionView.class, new 
CreatePCollectionViewTranslator());
+registerTransformTranslator(CreateApexPCollectionView.class, new 
CreateApexPCollectionViewTranslator());
+registerTransformTranslator(CreatePCollectionView.class, new 
CreatePCollectionViewTranslator());
   }
 
   public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -151,7 +153,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 
   }
 
-  private static class CreatePCollectionViewTranslator 
implements TransformTranslator>
+  private static class CreateApexPCollectionViewTranslator 
implements TransformTranslator>
   {
 private static final long serialVersionUID = 1L;
 
@@ -164,4 +166,17 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 }
   }
 
+  private static class CreatePCollectionViewTranslator 
implements TransformTranslator>
+  {
+private static final long serialVersionUID = 1L;
+
+@Override
+public void translate(CreatePCollectionView transform, 
TranslationContext context)
+{
+  PCollectionView view = transform.getView();
+  context.addView(view);
+  LOG.debug("view {}", view.getName());
+}
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index ad49f08..667f1c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -74,7 +74,7 @@ public class ApexRunner extends 
PipelineRunner {
   /**
* TODO: this isn't thread safe and may cause issues when tests run in 
parallel
* Holds any most resent assertion error that was raised while processing 
elements.
-   * Used in the unit test driver in embedded to propagate the exception.
+   * Used in the unit test driver in embedded mode to propagate the exception.
*/
   public static volatile AssertionError assertionError;
 
@@ -100,6 +100,8 @@ public class ApexRunner extends 
PipelineRunner {
 

[17/39] incubator-beam git commit: [BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar

2016-11-11 Thread kenn
[BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f8a80d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f8a80d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f8a80d

Branch: refs/heads/master
Commit: c8f8a80d4c6846fd941fbba08727b7a3ecaca7e1
Parents: c9f1406
Author: Ismaël Mejía 
Authored: Mon Oct 24 11:30:46 2016 +0200
Committer: Ismaël Mejía 
Committed: Tue Oct 25 22:36:19 2016 +0200

--
 runners/apex/pom.xml|  6 +-
 .../apache/beam/runners/apex/ApexRunner.java|  5 +-
 .../beam/runners/apex/ApexRunnerRegistrar.java  | 61 
 .../apex/translators/utils/ApexStreamTuple.java |  2 +-
 .../runners/apex/ApexRunnerRegistrarTest.java   | 47 +++
 5 files changed, 116 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8b62410..de191f5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -138,11 +138,11 @@
   test
 
 
-
+
   
 
   
@@ -183,7 +183,7 @@
   
 
   [
-"--runner=org.apache.beam.runners.apex.TestApexRunner"
+"--runner=TestApexRunner"
   ]
 
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f3c44bb..8da4ec3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,6 +32,7 @@ import 
org.apache.beam.runners.apex.translators.TranslationContext;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -75,7 +76,9 @@ public class ApexRunner extends 
PipelineRunner {
   }
 
   public static ApexRunner fromOptions(PipelineOptions options) {
-return new ApexRunner((ApexPipelineOptions) options);
+ApexPipelineOptions apexPipelineOptions =
+PipelineOptionsValidator.validate(ApexPipelineOptions.class, 
options);
+return new ApexRunner(apexPipelineOptions);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
new file mode 100644
index 000..aa6ef45
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link 
PipelineOptionsRegistrar} for the
+ * {@link 

[03/39] incubator-beam git commit: BEAM-261 Use Apex 3.5.0-SNAPSHOT to use loopback as connect address.

2016-11-11 Thread kenn
BEAM-261 Use Apex 3.5.0-SNAPSHOT to use loopback as connect address.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7e430d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7e430d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7e430d6

Branch: refs/heads/master
Commit: a7e430d6b24de53a116258af75c7eb15d6133b4d
Parents: aaf38dd
Author: Thomas Weise 
Authored: Wed Aug 31 16:41:52 2016 -0700
Committer: Thomas Weise 
Committed: Sun Oct 16 23:22:59 2016 -0700

--
 runners/apex/pom.xml   | 6 +++---
 .../beam/runners/apex/translators/CreateValuesTranslator.java  | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7e430d6/runners/apex/pom.xml
--
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index bb08b3c..21e53a8 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -35,9 +35,9 @@
   jar
 
   
-3.4.0
+3.5.0-SNAPSHOT
 3.4.0
-true
+false
 
 -Xmx2048m
   
@@ -206,7 +206,7 @@
 
 
   
-
org.apache.apex:apex-api:jar:3.4.0
+
org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT
 
org.apache.commons:commons-lang3::3.1
 
com.esotericsoftware.kryo:kryo::2.24.0
 
com.datatorrent:netlet::1.2.1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7e430d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
index 387b19f..7a29057 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
@@ -23,6 +23,7 @@ import 
org.apache.beam.runners.apex.translators.io.ValuesSource;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
 
 import com.google.common.base.Throwables;
 
@@ -38,7 +39,7 @@ public class CreateValuesTranslator implements 
TransformTranslator transform, TranslationContext 
context) {
 try {
   UnboundedSource unboundedSource = new 
ValuesSource<>(transform.getElements(),
-  transform.getDefaultOutputCoder(context.getInput()));
+  transform.getDefaultOutputCoder((PBegin)context.getInput()));
   ApexReadUnboundedInputOperator operator = new 
ApexReadUnboundedInputOperator<>(unboundedSource,
   context.getPipelineOptions());
   context.addOperator(operator, operator.output);



[39/39] incubator-beam git commit: Merge apex-runner to master. This closes #1305.

2016-11-11 Thread kenn
Merge apex-runner to master. This closes #1305.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d069a65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d069a65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d069a65

Branch: refs/heads/master
Commit: 7d069a65b4bb264bce279b19a1dc0f7372ce43fb
Parents: e2c2159 41394c2
Author: Kenneth Knowles 
Authored: Fri Nov 11 18:19:34 2016 -0800
Committer: Kenneth Knowles 
Committed: Fri Nov 11 18:19:34 2016 -0800

--
 examples/java/pom.xml   |  35 ++
 pom.xml |   6 +
 runners/apex/README.md  |  76 +++
 runners/apex/pom.xml| 234 +
 .../beam/runners/apex/ApexPipelineOptions.java  |  60 +++
 .../apache/beam/runners/apex/ApexRunner.java| 398 
 .../beam/runners/apex/ApexRunnerRegistrar.java  |  61 +++
 .../beam/runners/apex/ApexRunnerResult.java | 110 +
 .../beam/runners/apex/TestApexRunner.java   |  73 +++
 .../apache/beam/runners/apex/package-info.java  |  22 +
 .../translation/ApexPipelineTranslator.java | 179 +++
 .../translation/CreateValuesTranslator.java |  48 ++
 .../FlattenPCollectionTranslator.java   | 129 +
 .../apex/translation/GroupByKeyTranslator.java  |  42 ++
 .../translation/ParDoBoundMultiTranslator.java  | 142 ++
 .../apex/translation/ParDoBoundTranslator.java  |  64 +++
 .../translation/ReadUnboundedTranslator.java|  42 ++
 .../apex/translation/TransformTranslator.java   |  31 ++
 .../apex/translation/TranslationContext.java| 178 +++
 .../operators/ApexFlattenOperator.java  | 125 +
 .../operators/ApexGroupByKeyOperator.java   | 475 +++
 .../operators/ApexParDoOperator.java| 375 +++
 .../ApexReadUnboundedInputOperator.java | 155 ++
 .../translation/operators/package-info.java |  22 +
 .../runners/apex/translation/package-info.java  |  22 +
 .../translation/utils/ApexStateInternals.java   | 438 +
 .../apex/translation/utils/ApexStreamTuple.java | 222 +
 .../utils/CoderAdapterStreamCodec.java  |  69 +++
 .../apex/translation/utils/NoOpStepContext.java |  72 +++
 .../utils/SerializablePipelineOptions.java  |  60 +++
 .../utils/ValueAndCoderKryoSerializable.java|  77 +++
 .../apex/translation/utils/ValuesSource.java| 149 ++
 .../apex/translation/utils/package-info.java|  22 +
 .../runners/apex/ApexRunnerRegistrarTest.java   |  47 ++
 .../apex/examples/UnboundedTextSource.java  | 142 ++
 .../runners/apex/examples/WordCountTest.java| 188 
 .../runners/apex/examples/package-info.java |  22 +
 .../translation/ApexGroupByKeyOperatorTest.java | 117 +
 .../FlattenPCollectionTranslatorTest.java   |  99 
 .../translation/GroupByKeyTranslatorTest.java   | 246 ++
 .../translation/ParDoBoundTranslatorTest.java   | 340 +
 .../translation/ReadUnboundTranslatorTest.java  | 129 +
 .../utils/ApexStateInternalsTest.java   | 361 ++
 .../translation/utils/CollectionSource.java | 136 ++
 .../translation/utils/PipelineOptionsTest.java  |  84 
 .../apex/src/test/resources/log4j.properties|  35 ++
 runners/apex/src/test/resources/words.txt   |   3 +
 runners/pom.xml |   1 +
 48 files changed, 6163 insertions(+)
--




[22/39] incubator-beam git commit: Merge branch 'master' into apex-runner

2016-11-11 Thread kenn
Merge branch 'master' into apex-runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fc47ed1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fc47ed1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fc47ed1

Branch: refs/heads/master
Commit: 6fc47ed101f02aacacecd5c62c4a026eaba3e12c
Parents: fa3a6aa 215980a
Author: Thomas Weise 
Authored: Thu Oct 27 19:25:06 2016 -0700
Committer: Thomas Weise 
Committed: Thu Oct 27 19:25:06 2016 -0700

--
 .travis.yml |  22 +-
 .../beam/examples/DebuggingWordCount.java   |   2 +-
 .../apache/beam/examples/MinimalWordCount.java  |   2 +-
 .../org/apache/beam/examples/WordCount.java |   2 +-
 .../apache/beam/examples/complete/TfIdf.java|   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/BigQueryTornadoes.java|   2 +-
 .../cookbook/CombinePerKeyExamples.java |   2 +-
 .../beam/examples/cookbook/DeDupExample.java|   2 +-
 .../beam/examples/cookbook/FilterExamples.java  |   2 +-
 .../beam/examples/cookbook/JoinExamples.java|   2 +-
 .../examples/cookbook/MaxPerKeyExamples.java|   2 +-
 .../org/apache/beam/examples/WordCountTest.java |   2 +-
 .../examples/complete/AutoCompleteTest.java |   6 +-
 .../beam/examples/complete/TfIdfTest.java   |   2 +-
 .../complete/TopWikipediaSessionsTest.java  |   2 +-
 .../examples/cookbook/DeDupExampleTest.java |   4 +-
 .../examples/cookbook/JoinExamplesTest.java |   2 +-
 .../examples/cookbook/TriggerExampleTest.java   |   2 +-
 .../beam/examples/MinimalWordCountJava8.java|   2 +-
 .../beam/examples/complete/game/GameStats.java  |  52 +-
 .../examples/complete/game/HourlyTeamScore.java |  30 +-
 .../examples/complete/game/LeaderBoard.java |  43 +-
 .../beam/examples/complete/game/UserScore.java  |  16 +-
 .../complete/game/utils/WriteToBigQuery.java|  49 +-
 .../game/utils/WriteWindowedToBigQuery.java |  14 +-
 .../examples/complete/game/GameStatsTest.java   |   2 +-
 .../complete/game/HourlyTeamScoreTest.java  |   2 +-
 .../examples/complete/game/LeaderBoardTest.java |  10 +-
 .../examples/complete/game/UserScoreTest.java   |   6 +-
 pom.xml |  47 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java|  19 -
 .../runners/direct/CloningBundleFactory.java|  98 
 .../beam/runners/direct/DirectRunner.java   |   7 +-
 .../runners/direct/DoFnLifecycleManager.java|  56 +-
 .../GroupAlsoByWindowEvaluatorFactory.java  |   8 +-
 .../direct/ImmutableListBundleFactory.java  |   4 +-
 .../beam/runners/direct/WatermarkManager.java   |  17 +-
 .../direct/WriteWithShardingFactory.java|   6 +-
 .../direct/CloningBundleFactoryTest.java| 177 +++
 .../ConsumerTrackingPipelineVisitorTest.java|  32 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  40 +-
 .../direct/DoFnLifecycleManagerTest.java|  74 ++-
 .../EncodabilityEnforcementFactoryTest.java |   6 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   8 +-
 .../ImmutabilityEnforcementFactoryTest.java |   8 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |   8 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   8 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  87 ++--
 .../direct/ParDoSingleEvaluatorFactoryTest.java |  94 ++--
 .../runners/direct/WatermarkManagerTest.java|   8 +-
 .../dataflow/BlockingDataflowRunner.java|  13 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  17 +-
 .../dataflow/DataflowPipelineTranslator.java|   4 +
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../options/DataflowPipelineOptions.java|  12 +
 .../beam/runners/dataflow/util/DoFnInfo.java|  43 +-
 runners/spark/pom.xml   |  66 ++-
 .../runners/spark/SparkPipelineOptions.java |  11 +
 .../apache/beam/runners/spark/SparkRunner.java  |  19 -
 .../metrics/AggregatorMetricSource.java |   9 +-
 .../metrics/WithNamedAggregatorsSupport.java|   7 +-
 .../coders/BeamSparkRunnerRegistrator.java  |  46 ++
 .../runners/spark/io/EmptyCheckpointMark.java   |  52 ++
 .../apache/beam/runners/spark/io/KafkaIO.java   | 131 -
 .../beam/runners/spark/io/MicrobatchSource.java | 262 ++
 .../beam/runners/spark/io/SourceDStream.java| 156 ++
 .../apache/beam/runners/spark/io/SourceRDD.java |  75 ++-
 .../runners/spark/io/SparkUnboundedSource.java  | 167 ++
 .../spark/stateful/StateSpecFunctions.java  | 167 ++
 .../runners/spark/stateful/package-info.java|  22 +
 .../spark/translation/EvaluationContext.java|   6 +-
 .../translation/GroupCombineFunctions.java  |  66 +--
 .../spark/translation/SparkContextFactory.java  |   5 +-
 .../spark/translation/SparkRuntimeCo

[25/39] incubator-beam git commit: Closes #1227

2016-11-11 Thread kenn
Closes #1227


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51af7e59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51af7e59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51af7e59

Branch: refs/heads/master
Commit: 51af7e592327ef711673a2b9828536051a6c3898
Parents: 968eb32 77f4ba2
Author: Thomas Weise 
Authored: Wed Nov 2 21:16:24 2016 -0700
Committer: Thomas Weise 
Committed: Wed Nov 2 21:16:24 2016 -0700

--
 examples/java/pom.xml   | 31 
 runners/apex/pom.xml|  2 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |  6 
 .../runners/apex/ApexPipelineTranslator.java|  2 +-
 .../apache/beam/runners/apex/ApexRunner.java| 12 
 .../beam/runners/apex/ApexRunnerResult.java | 27 +++--
 .../beam/runners/apex/TestApexRunner.java   | 20 +++--
 .../io/ApexReadUnboundedInputOperator.java  | 13 ++--
 .../apex/examples/StreamingWordCountTest.java   |  4 +--
 .../translators/ParDoBoundTranslatorTest.java   |  6 ++--
 10 files changed, 91 insertions(+), 32 deletions(-)
--




[23/39] incubator-beam git commit: Adjust for merge from master.

2016-11-11 Thread kenn
Adjust for merge from master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968eb32b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968eb32b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968eb32b

Branch: refs/heads/master
Commit: 968eb32b8a77d5613f7645c0f04ce194588e
Parents: 6fc47ed
Author: Thomas Weise 
Authored: Thu Oct 27 20:21:52 2016 -0700
Committer: Thomas Weise 
Committed: Thu Oct 27 20:21:52 2016 -0700

--
 .../main/java/org/apache/beam/runners/apex/ApexRunnerResult.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968eb32b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index d5613fe..4e3a8d2 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -62,12 +62,12 @@ public class ApexRunnerResult implements PipelineResult {
   }
 
   @Override
-  public State waitUntilFinish(Duration duration) throws IOException, 
InterruptedException {
+  public State waitUntilFinish(Duration duration) {
 throw new UnsupportedOperationException();
   }
 
   @Override
-  public State waitUntilFinish() throws IOException, InterruptedException {
+  public State waitUntilFinish() {
 throw new UnsupportedOperationException();
   }
 



[16/39] incubator-beam git commit: Closes #1139

2016-11-11 Thread kenn
Closes #1139


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989e3998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989e3998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989e3998

Branch: refs/heads/master
Commit: 989e399874a5f0ebcf7c19f24a7fd18cead7bfba
Parents: 0a1b278 7105d92
Author: Thomas Weise 
Authored: Tue Oct 25 11:01:15 2016 -0700
Committer: Thomas Weise 
Committed: Tue Oct 25 11:01:15 2016 -0700

--
 .../translators/ParDoBoundMultiTranslator.java  | 14 ++-
 .../functions/ApexFlattenOperator.java  |  3 +-
 .../translators/ParDoBoundTranslatorTest.java   | 96 +++-
 3 files changed, 107 insertions(+), 6 deletions(-)
--




[jira] [Resolved] (BEAM-561) Add WindowedWordCountIT

2016-11-11 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles resolved BEAM-561.
--
   Resolution: Fixed
Fix Version/s: 0.3.0-incubating

> Add WindowedWordCountIT
> ---
>
> Key: BEAM-561
> URL: https://issues.apache.org/jira/browse/BEAM-561
> Project: Beam
>  Issue Type: Bug
>Reporter: Jason Kuster
>Assignee: Mark Liu
> Fix For: 0.3.0-incubating
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-966) Run WindowedWordCount Integration Test in Apex runner

2016-11-11 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-966:
-
Summary: Run WindowedWordCount Integration Test in Apex runner  (was: 
Running WindowedWordCount Integration Test in Apex runner)

> Run WindowedWordCount Integration Test in Apex runner
> -
>
> Key: BEAM-966
> URL: https://issues.apache.org/jira/browse/BEAM-966
> Project: Beam
>  Issue Type: Improvement
>Reporter: Kenneth Knowles
>Assignee: Thomas Weise
>
> The purpose of running WindowedWordCountIT in Spark is to have a streaming 
> test pipeline running in Jenkins pre-commit using TestSparkRunner.
> More discussion happened here:
> https://github.com/apache/incubator-beam/pull/1045#issuecomment-251531770



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-719) Run WindowedWordCount Integration Test in Spark

2016-11-11 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-719:
-
Summary: Run WindowedWordCount Integration Test in Spark  (was: Running 
WindowedWordCount Integration Test in Spark)

> Run WindowedWordCount Integration Test in Spark
> ---
>
> Key: BEAM-719
> URL: https://issues.apache.org/jira/browse/BEAM-719
> Project: Beam
>  Issue Type: Improvement
>Reporter: Mark Liu
>Assignee: Amit Sela
>
> The purpose of running WindowedWordCountIT in Spark is to have a streaming 
> test pipeline running in Jenkins pre-commit using TestSparkRunner.
> More discussion happened here:
> https://github.com/apache/incubator-beam/pull/1045#issuecomment-251531770



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-720) Run WindowedWordCount Integration Test in Flink

2016-11-11 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-720:
-
Summary: Run WindowedWordCount Integration Test in Flink  (was: Running 
WindowedWordCount Integration Test in Flink)

> Run WindowedWordCount Integration Test in Flink
> ---
>
> Key: BEAM-720
> URL: https://issues.apache.org/jira/browse/BEAM-720
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Mark Liu
>Assignee: Aljoscha Krettek
>
> In order to have coverage of streaming pipeline test in pre-commit, it's 
> important to have TestFlinkRunner to be able to run WindowedWordCountIT 
> successfully. 
> Relevant works in TestDataflowRunner:
> https://github.com/apache/incubator-beam/pull/1045



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-966) Running WindowedWordCount Integration Test in Apex runner

2016-11-11 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-966:
-
Assignee: Thomas Weise  (was: Amit Sela)

> Running WindowedWordCount Integration Test in Apex runner
> -
>
> Key: BEAM-966
> URL: https://issues.apache.org/jira/browse/BEAM-966
> Project: Beam
>  Issue Type: Improvement
>Reporter: Kenneth Knowles
>Assignee: Thomas Weise
>
> The purpose of running WindowedWordCountIT in Spark is to have a streaming 
> test pipeline running in Jenkins pre-commit using TestSparkRunner.
> More discussion happened here:
> https://github.com/apache/incubator-beam/pull/1045#issuecomment-251531770



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-966) Running WindowedWordCount Integration Test in Apex runner

2016-11-11 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-966:


 Summary: Running WindowedWordCount Integration Test in Apex runner
 Key: BEAM-966
 URL: https://issues.apache.org/jira/browse/BEAM-966
 Project: Beam
  Issue Type: Improvement
Reporter: Kenneth Knowles
Assignee: Amit Sela


The purpose of running WindowedWordCountIT in Spark is to have a streaming test 
pipeline running in Jenkins pre-commit using TestSparkRunner.

More discussion happened here:
https://github.com/apache/incubator-beam/pull/1045#issuecomment-251531770



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-310) Exercise splitIntoBundles/generateInitialSplits in the Direct Runner

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658789#comment-15658789
 ] 

ASF GitHub Bot commented on BEAM-310:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1339


> Exercise splitIntoBundles/generateInitialSplits in the Direct Runner
> 
>
> Key: BEAM-310
> URL: https://issues.apache.org/jira/browse/BEAM-310
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Thomas Groh
>Assignee: Thomas Groh
> Fix For: 0.3.0-incubating
>
>
> BoundedSource#splitIntoBundles and UnboundedSource#generateInitialSplits are 
> the methods by which sources can be accessed in parallel. Exercising these 
> methods allows reads (and all transforms downstream) to be executed in 
> parallel both pre and post a GroupByKey



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1339: [BEAM-310] Actually Split Root Transforms

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1339


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1339

2016-11-11 Thread tgroh
This closes #1339


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2c21599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2c21599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2c21599

Branch: refs/heads/master
Commit: e2c21599d236a79a934dd740a49b86aa6da1a696
Parents: fe17ef7 6b1cec2
Author: Thomas Groh 
Authored: Fri Nov 11 18:03:02 2016 -0800
Committer: Thomas Groh 
Committed: Fri Nov 11 18:03:02 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   | 27 +
 .../direct/ExecutorServiceParallelExecutor.java | 15 +++--
 .../beam/runners/direct/DirectRunnerTest.java   | 62 
 3 files changed, 73 insertions(+), 31 deletions(-)
--




[1/2] incubator-beam git commit: Actually Split Root Transforms

2016-11-11 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master fe17ef7f8 -> e2c21599d


Actually Split Root Transforms

Permit the ExecutorServiceParallelExecutor to control its own
ExecutorService by passing only a TargetParallelism parameter. Split
roots into the greater of 3 or the target parallelism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b1cec29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b1cec29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b1cec29

Branch: refs/heads/master
Commit: 6b1cec2930a199bba2c65d379116c746532c4148
Parents: fe17ef7
Author: Thomas Groh 
Authored: Thu Nov 10 13:47:40 2016 -0800
Committer: Thomas Groh 
Committed: Fri Nov 11 16:54:11 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   | 27 +
 .../direct/ExecutorServiceParallelExecutor.java | 15 +++--
 .../beam/runners/direct/DirectRunnerTest.java   | 62 
 3 files changed, 73 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f4aeb3e..c9a7864 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,8 +29,6 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
@@ -258,7 +256,6 @@ public class DirectRunner
   

   private final DirectOptions options;
   private final Set enabledEnforcements;
-  private Supplier executorServiceSupplier;
   private Supplier clockSupplier = new NanosOffsetClockSupplier();
 
   public static DirectRunner fromOptions(PipelineOptions options) {
@@ -268,7 +265,6 @@ public class DirectRunner
   private DirectRunner(DirectOptions options) {
 this.options = options;
 this.enabledEnforcements = Enforcement.enabled(options);
-this.executorServiceSupplier = new FixedThreadPoolSupplier(options);
   }
 
   /**
@@ -326,14 +322,11 @@ public class DirectRunner
 consumerTrackingVisitor.getStepNames(),
 consumerTrackingVisitor.getViews());
 
-// independent executor service for each run
-ExecutorService executorService = executorServiceSupplier.get();
-
 RootInputProvider rootInputProvider = 
RootProviderRegistry.defaultRegistry(context);
 TransformEvaluatorRegistry registry = 
TransformEvaluatorRegistry.defaultRegistry(context);
 PipelineExecutor executor =
 ExecutorServiceParallelExecutor.create(
-executorService,
+options.getTargetParallelism(),
 consumerTrackingVisitor.getValueToConsumers(),
 keyedPValueVisitor.getKeyedPValues(),
 rootInputProvider,
@@ -470,24 +463,6 @@ public class DirectRunner
   }
 
   /**
-   * A {@link Supplier} that creates a {@link ExecutorService} based on
-   * {@link Executors#newFixedThreadPool(int)}.
-   */
-  private static class FixedThreadPoolSupplier implements 
Supplier {
-private final DirectOptions options;
-
-private FixedThreadPoolSupplier(DirectOptions options) {
-  this.options = options;
-}
-
-@Override
-public ExecutorService get() {
-  return Executors.newFixedThreadPool(options.getTargetParallelism());
-}
-  }
-
-
-  /**
* A {@link Supplier} that creates a {@link NanosOffsetClock}.
*/
   private static class NanosOffsetClockSupplier implements Supplier {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 30fc417..0bb3d01 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/m

Jenkins build is still unstable: beam_PostCommit_RunnableOnService_SparkLocal #126

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #819

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build became unstable: beam_PostCommit_RunnableOnService_SparkLocal #125

2016-11-11 Thread Apache Jenkins Server
See 




[GitHub] incubator-beam pull request #1353: Liberates ReduceFnRunner from WindowingIn...

2016-11-11 Thread jkff
GitHub user jkff opened a pull request:

https://github.com/apache/incubator-beam/pull/1353

Liberates ReduceFnRunner from WindowingInternals, and lets 
WindowingInternals do windowed side outputs

- Introduces WindowingInternals.sideOutputWindowedValue (will be necessary 
for Splittable DoFn)
- Implements it properly in all runners (required some minor refactoring in 
Spark and Flink ProcessContext implementations)
- Introduces "OutputWindowedValue" interface and "SideInputAccess" 
interfaces, and uses them in ReduceFnRunner instead of directly using 
WindowingInternals.
- Introduces adapters from WindowingInternals to these two interfaces, for 
use in OldDoFn contexts
- Moves some StateContext functions into ReduceFnContextFactory, because 
they make more sense in runners-core than in sdk (because they are only invoked 
by different runners). The only remaining StateContexts function is 
nullContext, but I couldn't figure out an easy way to move it into runners-core 
and gave up (however in fact I'm not sure its current usages are correct at 
all...)

R: @kennknowles (for bulk of the code and as committer)
CC: @aljoscha @amitsela (for the minor refactorings in respective runners)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkff/incubator-beam 
reducefn-windowing-internals

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1353


commit 14dd49e02ab2eabb7c7e42cb1e9cab29f7cea44e
Author: Eugene Kirpichov 
Date:   2016-11-11T02:40:53Z

Liberates ReduceFnRunner from WindowingInternals

commit d6c62454bb822063c863d2bcd1f2436453b94864
Author: Eugene Kirpichov 
Date:   2016-11-12T00:37:42Z

Refactor FlinkProcessContext more cleanly into single- and multi-output 
versions

commit 14e5847f738b3d89808d5d612684fcddffd51b7f
Author: Eugene Kirpichov 
Date:   2016-11-12T00:57:27Z

Refactor SparkProcessContext more cleanly into single- and multi-output 
versions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (BEAM-924) Findbugs doesn't pass in Direct Runner

2016-11-11 Thread Thomas Groh (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Groh resolved BEAM-924.
--
   Resolution: Fixed
Fix Version/s: 0.4.0-incubating

> Findbugs doesn't pass in Direct Runner
> --
>
> Key: BEAM-924
> URL: https://issues.apache.org/jira/browse/BEAM-924
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Daniel Halperin
>Assignee: Thomas Groh
> Fix For: 0.4.0-incubating
>
>
> {code}
> [INFO] AggregatorContainer$AggregatorInfo.accumulator not guarded against 
> concurrent access; locked 71% of time 
> [org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo] 
> Unsynchronized access at AggregatorContainer.java:[line 80]Unsynchronized 
> access at AggregatorContainer.java:[line 79]Unsynchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 78]Synchronized access at 
> AggregatorContainer.java:[line 79]Synchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 59]Synchronized access at 
> AggregatorContainer.java:[line 62]Synchronized access at 
> AggregatorContainer.java:[line 62]Synchronized access at 
> AggregatorContainer.java:[line 60]Synchronized access at 
> AggregatorContainer.java:[line 66]Synchronized access at 
> AggregatorContainer.java:[line 66]
> [INFO] bundle must be non-null but is marked as nullable 
> [org.apache.beam.runners.direct.ExecutorServiceParallelExecutor] At 
> ExecutorServiceParallelExecutor.java:[lines 185-186]
> [INFO] Exceptional return value of 
> java.util.concurrent.BlockingQueue.offer(Object) ignored in 
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable.run()
>  
> [org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable]
>  At ExecutorServiceParallelExecutor.java:[line 412]
> [INFO] 
> org.apache.beam.runners.direct.WatermarkCallbackExecutor$CallbackOrdering 
> implements Comparator but not Serializable 
> [org.apache.beam.runners.direct.WatermarkCallbackExecutor$CallbackOrdering] 
> At WatermarkCallbackExecutor.java:[lines 132-138]
> [INFO] 
> org.apache.beam.runners.direct.WatermarkManager$WindowedValueByTimestampComparator
>  implements Comparator but not Serializable 
> [org.apache.beam.runners.direct.WatermarkManager$WindowedValueByTimestampComparator]
>  At WatermarkManager.java:[lines 1380-1385]
> [INFO] 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-924) Findbugs doesn't pass in Direct Runner

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658681#comment-15658681
 ] 

ASF GitHub Bot commented on BEAM-924:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1325


> Findbugs doesn't pass in Direct Runner
> --
>
> Key: BEAM-924
> URL: https://issues.apache.org/jira/browse/BEAM-924
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Daniel Halperin
>Assignee: Thomas Groh
>
> {code}
> [INFO] AggregatorContainer$AggregatorInfo.accumulator not guarded against 
> concurrent access; locked 71% of time 
> [org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo, 
> org.apache.beam.runners.direct.AggregatorContainer$AggregatorInfo] 
> Unsynchronized access at AggregatorContainer.java:[line 80]Unsynchronized 
> access at AggregatorContainer.java:[line 79]Unsynchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 78]Synchronized access at 
> AggregatorContainer.java:[line 79]Synchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 81]Synchronized access at 
> AggregatorContainer.java:[line 59]Synchronized access at 
> AggregatorContainer.java:[line 62]Synchronized access at 
> AggregatorContainer.java:[line 62]Synchronized access at 
> AggregatorContainer.java:[line 60]Synchronized access at 
> AggregatorContainer.java:[line 66]Synchronized access at 
> AggregatorContainer.java:[line 66]
> [INFO] bundle must be non-null but is marked as nullable 
> [org.apache.beam.runners.direct.ExecutorServiceParallelExecutor] At 
> ExecutorServiceParallelExecutor.java:[lines 185-186]
> [INFO] Exceptional return value of 
> java.util.concurrent.BlockingQueue.offer(Object) ignored in 
> org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable.run()
>  
> [org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$MonitorRunnable]
>  At ExecutorServiceParallelExecutor.java:[line 412]
> [INFO] 
> org.apache.beam.runners.direct.WatermarkCallbackExecutor$CallbackOrdering 
> implements Comparator but not Serializable 
> [org.apache.beam.runners.direct.WatermarkCallbackExecutor$CallbackOrdering] 
> At WatermarkCallbackExecutor.java:[lines 132-138]
> [INFO] 
> org.apache.beam.runners.direct.WatermarkManager$WindowedValueByTimestampComparator
>  implements Comparator but not Serializable 
> [org.apache.beam.runners.direct.WatermarkManager$WindowedValueByTimestampComparator]
>  At WatermarkManager.java:[lines 1380-1385]
> [INFO] 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[2/2] incubator-beam git commit: This closes #1325

2016-11-11 Thread tgroh
This closes #1325


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fe17ef7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fe17ef7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fe17ef7f

Branch: refs/heads/master
Commit: fe17ef7f8f5a8037d0aeae8841b424cb76e86dcb
Parents: f0f4af5 bfa4e4e
Author: Thomas Groh 
Authored: Fri Nov 11 16:51:37 2016 -0800
Committer: Thomas Groh 
Committed: Fri Nov 11 16:51:37 2016 -0800

--
 runners/direct-java/pom.xml | 13 ---
 .../runners/direct/AggregatorContainer.java |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java | 41 
 .../direct/WatermarkCallbackExecutor.java   |  4 +-
 .../beam/runners/direct/WatermarkManager.java   |  4 +-
 5 files changed, 32 insertions(+), 32 deletions(-)
--




[1/2] incubator-beam git commit: Fix FindBugs Errors in the Direct Runner

2016-11-11 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master f0f4af581 -> fe17ef7f8


Fix FindBugs Errors in the Direct Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa4e4ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa4e4ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa4e4ec

Branch: refs/heads/master
Commit: bfa4e4ece0091efc635c5c62c3eaf955f597fa39
Parents: f0f4af5
Author: Thomas Groh 
Authored: Wed Nov 9 13:24:19 2016 -0800
Committer: Thomas Groh 
Committed: Fri Nov 11 16:49:09 2016 -0800

--
 runners/direct-java/pom.xml | 13 ---
 .../runners/direct/AggregatorContainer.java |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java | 41 
 .../direct/WatermarkCallbackExecutor.java   |  4 +-
 .../beam/runners/direct/WatermarkManager.java   |  4 +-
 5 files changed, 32 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/pom.xml
--
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 43cf3c0..8983b1c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -40,19 +40,6 @@
   
 
 
-
-  
-
-
-  org.codehaus.mojo
-  findbugs-maven-plugin
-  
-true
-  
-
-  
-
-
 
   
 org.apache.maven.plugins

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index 7b6bc64..e86bc3e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -43,7 +43,7 @@ public class AggregatorContainer {
 private final String name;
 private final CombineFn combiner;
 @GuardedBy("this")
-private AccumT accumulator = null;
+private volatile AccumT accumulator = null;
 private boolean committed = false;
 
 private AggregatorInfo(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index d1ffea1..30fc417 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -30,12 +32,12 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -142,7 +144,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
 
CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
 
 this.allUpdates = new ConcurrentLinkedQueue<>();
-this.visibleUpdates = new ArrayBlockingQueue<>(20);
+this.visibleUpdates = new LinkedBlockingQueue<>();
 
 parallelExecutorService = 
TransformExecutorServices.parallel(executorService);
 defaultCompletionCallback =
@@ -180,7 +182,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   @SuppressWarnings("unchecked")
   public void scheduleConsumption(
   AppliedPTransform consumer,
-  @Nullable CommittedBundle bundle,
+  Committ

[GitHub] incubator-beam pull request #1325: [BEAM-924] Fix FindBugs Errors in the Dir...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1325


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #818

2016-11-11 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-223) KafkaIO: don't use SerializableCoder

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658606#comment-15658606
 ] 

ASF GitHub Bot commented on BEAM-223:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1312


> KafkaIO: don't use SerializableCoder
> 
>
> Key: BEAM-223
> URL: https://issues.apache.org/jira/browse/BEAM-223
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Daniel Halperin
>Assignee: Raghu Angadi
>
> Reuven says:
> {quote}
> I noticed that we're using SerializableCoder for the checkpoint mark in 
> KafkaIO. This is generally highly discouraged in streaming pipelines. 
> Partially because it's inefficient, but more importantly because Java 
> serialization is not guaranteed to be stable. If a user updates their 
> pipeline, the new pipeline may not be able to decode the existing checkpoint 
> marks; this will either cause exceptions to be thrown, or data loss.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1312: [BEAM-223] Use Avro serializer for Kafka ...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1312


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Use Avro serializer for Kafka checkpoint mark.

2016-11-11 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master b25131422 -> f0f4af581


Use Avro serializer for Kafka checkpoint mark.

This is more partable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937ac3b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937ac3b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937ac3b2

Branch: refs/heads/master
Commit: 937ac3b2ddc60fd9446440c9354139c6234cb625
Parents: b251314
Author: Raghu Angadi 
Authored: Tue Nov 8 07:08:32 2016 -0800
Committer: Dan Halperin 
Committed: Fri Nov 11 16:14:07 2016 -0800

--
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  | 32 +---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 ++-
 2 files changed, 32 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 4f9e96f..763a98a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -20,19 +20,21 @@ package org.apache.beam.sdk.io.kafka;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.kafka.common.TopicPartition;
 
 /**
  * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, 
partition id,
  * and the latest offset consumed so far.
  */
-@DefaultCoder(SerializableCoder.class)
-public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+@DefaultCoder(AvroCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
+
+  private List partitions;
 
-  private final List partitions;
+  private KafkaCheckpointMark() {} // for Avro
 
   public KafkaCheckpointMark(List partitions) {
 this.partitions = partitions;
@@ -55,16 +57,24 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark, Seri
* for a single partition.
*/
   public static class PartitionMark implements Serializable {
-private final TopicPartition topicPartition;
-private final long nextOffset;
+private String topic;
+private int partition;
+private long nextOffset;
+
+private PartitionMark() {} // for Avro
 
-public PartitionMark(TopicPartition topicPartition, long offset) {
-  this.topicPartition = topicPartition;
+public PartitionMark(String topic, int partition, long offset) {
+  this.topic = topic;
+  this.partition = partition;
   this.nextOffset = offset;
 }
 
-public TopicPartition getTopicPartition() {
-  return topicPartition;
+public String getTopic() {
+  return topic;
+}
+
+public int getPartition() {
+  return partition;
 }
 
 public long getNextOffset() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
--
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 834104e..4212d59 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,11 +49,12 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -721,7 +722,7 @@ public class KafkaIO {
 
 @Override
 public Coder getCheckpointMarkCoder() {
-  return SerializableCoder.of(KafkaCheckpointMark.class);
+  return AvroCoder.of(KafkaCheckpointMark.cla

[2/2] incubator-beam git commit: Closes #1312

2016-11-11 Thread dhalperi
Closes #1312


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0f4af58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0f4af58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0f4af58

Branch: refs/heads/master
Commit: f0f4af581f2cb6317ded367d4ddda35df94a7451
Parents: b251314 937ac3b
Author: Dan Halperin 
Authored: Fri Nov 11 16:14:15 2016 -0800
Committer: Dan Halperin 
Committed: Fri Nov 11 16:14:15 2016 -0800

--
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  | 32 +---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 ++-
 2 files changed, 32 insertions(+), 18 deletions(-)
--




Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #817

2016-11-11 Thread Apache Jenkins Server
See 




[1/2] incubator-beam git commit: WordCountIT: uses input with fewer keys.

2016-11-11 Thread davor
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6814a99c2 -> b25131422


WordCountIT: uses input with fewer keys.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95ca1add
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95ca1add
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95ca1add

Branch: refs/heads/master
Commit: 95ca1add2df97a5bc6fa96c0810b8d6a0cb0c00c
Parents: 6814a99
Author: Pei He 
Authored: Fri Nov 11 14:13:02 2016 -0800
Committer: Davor Bonaci 
Committed: Fri Nov 11 16:02:55 2016 -0800

--
 .../java/org/apache/beam/examples/WindowedWordCountIT.java | 6 --
 .../src/test/java/org/apache/beam/examples/WordCountIT.java| 5 -
 2 files changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95ca1add/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
--
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index a5113c8..5d77dd5 100644
--- 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -38,7 +38,9 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class WindowedWordCountIT {
 
-  private static final String DEFAULT_OUTPUT_CHECKSUM = 
"ff54f6f42b2afeb146206c1e8e915deaee0362b4";
+  private static final String DEFAULT_INPUT =
+  "gs://apache-beam-samples/shakespeare/winterstale-personae";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = 
"cd5b52939257e12428a9fa085c32a84dd209b180";
 
   /**
* Options for the {@link WindowedWordCount} Integration Test.
@@ -67,7 +69,7 @@ public class WindowedWordCountIT {
 WindowedWordCountITOptions options =
 
TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
 options.setStreaming(isStreaming);
-options.setTestTimeoutSeconds(1200L);
+options.setInputFile(DEFAULT_INPUT);
 
 // Note: currently unused because the example writes to BigQuery, but 
WindowedWordCount.Options
 // are tightly coupled to WordCount.Options, where the option is required.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95ca1add/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
--
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 487f04b..f2afe6a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -36,7 +36,9 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class WordCountIT {
 
-  private static final String DEFAULT_OUTPUT_CHECKSUM = 
"8ae94f799f97cfd1cb5e8125951b32dfb52e1f12";
+  private static final String DEFAULT_INPUT =
+  "gs://apache-beam-samples/shakespeare/winterstale-personae";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = 
"508517575eba8d8d5a54f7f0080a00951cfe84ca";
 
   /**
* Options for the WordCount Integration Test.
@@ -56,6 +58,7 @@ public class WordCountIT {
   public void testE2EWordCount() throws Exception {
 WordCountITOptions options = 
TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
 
+options.setInputFile(DEFAULT_INPUT);
 options.setOutput(IOChannelUtils.resolve(
 options.getTempRoot(),
 String.format("WordCountIT-%tF-%

[GitHub] incubator-beam pull request #1351: WordCountIT: uses input file with fewer k...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1351


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1351

2016-11-11 Thread davor
This closes #1351


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2513142
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2513142
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2513142

Branch: refs/heads/master
Commit: b251314220861abcd4c64f3f70a1f1434885e27e
Parents: 6814a99 95ca1ad
Author: Davor Bonaci 
Authored: Fri Nov 11 16:04:36 2016 -0800
Committer: Davor Bonaci 
Committed: Fri Nov 11 16:04:36 2016 -0800

--
 .../java/org/apache/beam/examples/WindowedWordCountIT.java | 6 --
 .../src/test/java/org/apache/beam/examples/WordCountIT.java| 5 -
 2 files changed, 8 insertions(+), 3 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-954] FileBasedSink: remove unused code of TemporaryFileRetention.

2016-11-11 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master 821923334 -> 6814a99c2


[BEAM-954] FileBasedSink: remove unused code of TemporaryFileRetention.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a151127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a151127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a151127

Branch: refs/heads/master
Commit: 2a151127f04733e6a1f87914901ae6b88c329935
Parents: 8219233
Author: Pei He 
Authored: Wed Nov 9 20:12:24 2016 -0800
Committer: Dan Halperin 
Committed: Fri Nov 11 15:30:03 2016 -0800

--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 49 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 67 
 2 files changed, 21 insertions(+), 95 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a151127/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index e6c37de..2d058ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -279,12 +279,6 @@ public abstract class FileBasedSink extends Sink {
* Subclass implementations can change the file naming template by 
supplying a value for
* {@link FileBasedSink#fileNamingTemplate}.
*
-   * Temporary Bundle File Handling:
-   *
-   * {@link FileBasedSink.FileBasedWriteOperation#temporaryFileRetention} 
controls the behavior
-   * for managing temporary files. By default, temporary files will be 
removed. Subclasses can
-   * provide a different value to the constructor.
-   *
* Note that in the case of permanent failure of a bundle's write, no 
clean up of temporary
* files will occur.
*
@@ -294,23 +288,10 @@ public abstract class FileBasedSink extends Sink {
*/
   public abstract static class FileBasedWriteOperation extends 
WriteOperation {
 /**
- * Options for handling of temporary output files.
- */
-public enum TemporaryFileRetention {
-  KEEP,
-  REMOVE
-}
-
-/**
  * The Sink that this WriteOperation will write to.
  */
 protected final FileBasedSink sink;
 
-/**
- * Option to keep or remove temporary output files.
- */
-protected final TemporaryFileRetention temporaryFileRetention;
-
 /** Directory for temporary output files. */
 protected final String tempDirectory;
 
@@ -350,27 +331,14 @@ public abstract class FileBasedSink extends Sink {
 }
 
 /**
- * Construct a FileBasedWriteOperation.
- *
- * @param sink the FileBasedSink that will be used to configure this write 
operation.
- * @param tempDirectory the base directory to be used for temporary output 
files.
- */
-public FileBasedWriteOperation(FileBasedSink sink, String 
tempDirectory) {
-  this(sink, tempDirectory, TemporaryFileRetention.REMOVE);
-}
-
-/**
  * Create a new FileBasedWriteOperation.
  *
  * @param sink the FileBasedSink that will be used to configure this write 
operation.
  * @param tempDirectory the base directory to be used for temporary output 
files.
- * @param temporaryFileRetention defines how temporary files are handled.
  */
-public FileBasedWriteOperation(FileBasedSink sink, String tempDirectory,
-TemporaryFileRetention temporaryFileRetention) {
+public FileBasedWriteOperation(FileBasedSink sink, String 
tempDirectory) {
   this.sink = sink;
   this.tempDirectory = tempDirectory;
-  this.temporaryFileRetention = temporaryFileRetention;
 }
 
 /**
@@ -415,15 +383,12 @@ public abstract class FileBasedSink extends Sink {
   }
   copyToOutputFiles(files, options);
 
-  // Optionally remove temporary files.
-  if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
-// We remove the entire temporary directory, rather than specifically 
removing the files
-// from writerResults, because writerResults includes only 
successfully completed bundles,
-// and we'd like to clean up the failed ones too.
-// Note that due to GCS eventual consistency, matching files in the 
temp directory is also
-// currently non-perfect and may fail to delete some files.
-removeTemporaryFiles(files, options);
-  }
+  // We remove the entire temporary directory, rather than specifically 
removing the files
+  // from writerResults, because writerResults includes only successfully 
completed bundles,
+  // an

[jira] [Commented] (BEAM-954) FileBasedSink: remove unused code of TemporaryFileRetention

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658514#comment-15658514
 ] 

ASF GitHub Bot commented on BEAM-954:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1331


> FileBasedSink: remove unused code of TemporaryFileRetention
> ---
>
> Key: BEAM-954
> URL: https://issues.apache.org/jira/browse/BEAM-954
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-gcp
>Reporter: Pei He
>Assignee: Pei He
>
> TemporaryFileRetention option is not used.
> Removing it will simplify the redesign of IOChannelFactory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[2/2] incubator-beam git commit: Closes #1331

2016-11-11 Thread dhalperi
Closes #1331


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6814a99c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6814a99c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6814a99c

Branch: refs/heads/master
Commit: 6814a99c22264e3c45864d7deb237108b1bd27d2
Parents: 8219233 2a15112
Author: Dan Halperin 
Authored: Fri Nov 11 15:30:04 2016 -0800
Committer: Dan Halperin 
Committed: Fri Nov 11 15:30:04 2016 -0800

--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 49 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 67 
 2 files changed, 21 insertions(+), 95 deletions(-)
--




[GitHub] incubator-beam pull request #1331: [BEAM-954] FileBasedSink: remove unused c...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1331


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #816

2016-11-11 Thread Apache Jenkins Server
See 




Jenkins build is back to stable : beam_PostCommit_MavenVerify #1795

2016-11-11 Thread Apache Jenkins Server
See 



Jenkins build is still unstable: beam_PostCommit_MavenVerify #1794

2016-11-11 Thread Apache Jenkins Server
See 



[jira] [Commented] (BEAM-965) Source Transformations Don't Set Correct Output Type in Flink Streaming Runner

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658437#comment-15658437
 ] 

ASF GitHub Bot commented on BEAM-965:
-

GitHub user aljoscha opened a pull request:

https://github.com/apache/incubator-beam/pull/1352

[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner

R: @mxm 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/incubator-beam flink-fix-sources

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1352


commit 40825e125be15c1c4c77b66b36c62cec24c6ddd8
Author: Aljoscha Krettek 
Date:   2016-11-11T22:58:16Z

[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner




> Source Transformations Don't Set Correct Output Type in Flink Streaming Runner
> --
>
> Key: BEAM-965
> URL: https://issues.apache.org/jira/browse/BEAM-965
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.4.0-incubating
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1352: [BEAM-965] Set Correct Output Type on Sou...

2016-11-11 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/incubator-beam/pull/1352

[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner

R: @mxm 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/incubator-beam flink-fix-sources

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1352


commit 40825e125be15c1c4c77b66b36c62cec24c6ddd8
Author: Aljoscha Krettek 
Date:   2016-11-11T22:58:16Z

[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (BEAM-965) Source Transformations Don't Set Correct Output Type in Flink Streaming Runner

2016-11-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-965:
--
Summary: Source Transformations Don't Set Correct Output Type in Flink 
Streaming Runner  (was: Source Operations Don't Set Correct Output Type in 
Flink Streaming Runner)

> Source Transformations Don't Set Correct Output Type in Flink Streaming Runner
> --
>
> Key: BEAM-965
> URL: https://issues.apache.org/jira/browse/BEAM-965
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.4.0-incubating
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to stable : beam_PostCommit_RunnableOnService_SparkLocal #122

2016-11-11 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-945) BigQueryIO.parseTableSpec is package-private

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658416#comment-15658416
 ] 

ASF GitHub Bot commented on BEAM-945:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1345


> BigQueryIO.parseTableSpec is package-private
> 
>
> Key: BEAM-945
> URL: https://issues.apache.org/jira/browse/BEAM-945
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 0.3.0-incubating
>Reporter: Andrew Martin
>Assignee: Daniel Halperin
>Priority: Blocker
>
> It was changed in this commit. 
> https://github.com/apache/incubator-beam/commit/dbbcbe604e167b306feac2443bec85f2da3c1dd6.
> Is there a good reason for it to not be public? If not I will submit a PR to 
> change it back.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1345: [BEAM-945] Make BigQueryIO.parseTableSpec...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1345


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Make BigQueryIO.parseTableSpec public

2016-11-11 Thread dhalperi
Make BigQueryIO.parseTableSpec public


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d622323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d622323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d622323

Branch: refs/heads/master
Commit: 3d62232389d44e669671585bc653581fbf1da62b
Parents: 703816d
Author: Andrew Martin 
Authored: Tue Nov 8 15:09:50 2016 -0500
Committer: Dan Halperin 
Committed: Fri Nov 11 14:50:31 2016 -0800

--
 .../main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d622323/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f30825f..7c9b3e0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -290,7 +290,7 @@ public class BigQueryIO {
*
* If the project id is omitted, the default project id is used.
*/
-  static TableReference parseTableSpec(String tableSpec) {
+  public static TableReference parseTableSpec(String tableSpec) {
 Matcher match = TABLE_SPEC.matcher(tableSpec);
 if (!match.matches()) {
   throw new IllegalArgumentException(



[1/2] incubator-beam git commit: Closes #1345

2016-11-11 Thread dhalperi
Repository: incubator-beam
Updated Branches:
  refs/heads/master 703816dbd -> 821923334


Closes #1345


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82192333
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82192333
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82192333

Branch: refs/heads/master
Commit: 8219233342232f7c607f66f09bcf08cb51611a4c
Parents: 703816d 3d62232
Author: Dan Halperin 
Authored: Fri Nov 11 14:50:31 2016 -0800
Committer: Dan Halperin 
Committed: Fri Nov 11 14:50:31 2016 -0800

--
 .../main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[jira] [Created] (BEAM-965) Source Operations Don't Set Correct Output Type in Flink Streaming Runner

2016-11-11 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-965:
-

 Summary: Source Operations Don't Set Correct Output Type in Flink 
Streaming Runner
 Key: BEAM-965
 URL: https://issues.apache.org/jira/browse/BEAM-965
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 0.4.0-incubating






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is still unstable: beam_PostCommit_RunnableOnService_FlinkLocal #815

2016-11-11 Thread Apache Jenkins Server
See 




[jira] [Closed] (BEAM-731) Replace DirectRunner with InProcessRunner

2016-11-11 Thread Ahmet Altay (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay closed BEAM-731.

   Resolution: Fixed
Fix Version/s: Not applicable

> Replace DirectRunner with InProcessRunner
> -
>
> Key: BEAM-731
> URL: https://issues.apache.org/jira/browse/BEAM-731
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py
>Reporter: Ahmet Altay
>Assignee: Ahmet Altay
>  Labels: sdk-consistency
> Fix For: Not applicable
>
>
> Remove the old DirectRunner and replace with the new InProcessRunner.
> There is an overhead for keeping both runners (testing/code maintenance etc.) 
> InProcessRunner has been available for a while, it is tested enough for the 
> being the default runner. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >