[GitHub] incubator-beam pull request #842: [BEAM-557] Fix repackaging exclude pattern...
GitHub user swegner opened a pull request: https://github.com/apache/incubator-beam/pull/842 [BEAM-557] Fix repackaging exclude pattern for guava-testlib Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- The previous fix was incorrect in that the exclude pattern requires `.*` at the end. Tested this iteration using `javap` to inspect generated `.class` files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/swegner/incubator-beam guava-testlib Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #842 commit 2a8ceed623e7909e0d67b71263e255e10a1c7693 Author: Scott Wegner Date: 2016-08-17T16:28:44Z Fix repackaging exclude pattern for guava-testlib --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-557) Test-scoped dependencies should be excluded from shading package relocation
[ https://issues.apache.org/jira/browse/BEAM-557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424846#comment-15424846 ] ASF GitHub Bot commented on BEAM-557: - GitHub user swegner opened a pull request: https://github.com/apache/incubator-beam/pull/842 [BEAM-557] Fix repackaging exclude pattern for guava-testlib Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- The previous fix was incorrect in that the exclude pattern requires `.*` at the end. Tested this iteration using `javap` to inspect generated `.class` files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/swegner/incubator-beam guava-testlib Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #842 commit 2a8ceed623e7909e0d67b71263e255e10a1c7693 Author: Scott Wegner Date: 2016-08-17T16:28:44Z Fix repackaging exclude pattern for guava-testlib > Test-scoped dependencies should be excluded from shading package relocation > --- > > Key: BEAM-557 > URL: https://issues.apache.org/jira/browse/BEAM-557 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Scott Wegner >Assignee: Scott Wegner >Priority: Minor > > Currently, guava-testlib is being relocated as part of the shading process, > but test-scope dependencies aren't bundled in the uber-jar. As a result, the > output JAR is unusable without recreating the same shading rules in a > consuming project. > Note that this does not effect our maven test process because tests are run > on the unshaded JAR. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-beam git commit: Fix repackaging exclude pattern for guava-testlib
Repository: incubator-beam Updated Branches: refs/heads/master 9b4a464a3 -> 46097736b Fix repackaging exclude pattern for guava-testlib Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a8ceed6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a8ceed6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a8ceed6 Branch: refs/heads/master Commit: 2a8ceed623e7909e0d67b71263e255e10a1c7693 Parents: 9b4a464 Author: Scott Wegner Authored: Wed Aug 17 09:28:44 2016 -0700 Committer: Scott Wegner Committed: Wed Aug 17 09:28:44 2016 -0700 -- pom.xml| 2 +- runners/direct-java/pom.xml| 4 ++-- runners/google-cloud-dataflow-java/pom.xml | 4 ++-- sdks/java/core/pom.xml | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a8ceed6/pom.xml -- diff --git a/pom.xml b/pom.xml index b5f30c1..f39f94a 100644 --- a/pom.xml +++ b/pom.xml @@ -634,7 +634,7 @@ + excluding com.google.common.**.testing.* --> com.google.guava guava-testlib ${guava.version} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a8ceed6/runners/direct-java/pom.xml -- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 11481f1..e06883f 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -194,7 +194,7 @@ com.google.common -com.google.common.**.testing +com.google.common.**.testing.* org.apache.beam.runners.direct.repackaged.com.google.common @@ -269,7 +269,7 @@ + excluding com.google.common.**.testing.* --> com.google.guava guava-testlib test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a8ceed6/runners/google-cloud-dataflow-java/pom.xml -- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index d5485ef..00b5a9b 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -198,7 +198,7 @@ com.google.common -com.google.common.**.testing +com.google.common.**.testing.* org.apache.beam.sdk.repackaged.com.google.common @@ -315,7 +315,7 @@ + excluding com.google.common.**.testing.* --> com.google.guava guava-testlib test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a8ceed6/sdks/java/core/pom.xml -- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index fddccea..78aec85 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -195,7 +195,7 @@ com.google.common -com.google.common.**.testing +com.google.common.**.testing.* org.apache.beam.sdk.repackaged.com.google.common @@ -426,7 +426,7 @@ + excluding com.google.common.**.testing.* --> com.google.guava guava-testlib test
[2/2] incubator-beam git commit: [BEAM-557] Fix repackaging exclude pattern for guava-testlib
[BEAM-557] Fix repackaging exclude pattern for guava-testlib This closes #842 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46097736 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46097736 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46097736 Branch: refs/heads/master Commit: 46097736b5af336b3f43a743eecc2a45041c6bf5 Parents: 9b4a464 2a8ceed Author: Luke Cwik Authored: Wed Aug 17 09:38:02 2016 -0700 Committer: Luke Cwik Committed: Wed Aug 17 09:38:02 2016 -0700 -- pom.xml| 2 +- runners/direct-java/pom.xml| 4 ++-- runners/google-cloud-dataflow-java/pom.xml | 4 ++-- sdks/java/core/pom.xml | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) --
[GitHub] incubator-beam pull request #842: [BEAM-557] Fix repackaging exclude pattern...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/842 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-557) Test-scoped dependencies should be excluded from shading package relocation
[ https://issues.apache.org/jira/browse/BEAM-557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424916#comment-15424916 ] ASF GitHub Bot commented on BEAM-557: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/842 > Test-scoped dependencies should be excluded from shading package relocation > --- > > Key: BEAM-557 > URL: https://issues.apache.org/jira/browse/BEAM-557 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Scott Wegner >Assignee: Scott Wegner >Priority: Minor > > Currently, guava-testlib is being relocated as part of the shading process, > but test-scope dependencies aren't bundled in the uber-jar. As a result, the > output JAR is unusable without recreating the same shading rules in a > consuming project. > Note that this does not effect our maven test process because tests are run > on the unshaded JAR. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-557) Test-scoped dependencies should be excluded from shading package relocation
[ https://issues.apache.org/jira/browse/BEAM-557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik resolved BEAM-557. Resolution: Fixed Fix Version/s: 0.3.0-incubating > Test-scoped dependencies should be excluded from shading package relocation > --- > > Key: BEAM-557 > URL: https://issues.apache.org/jira/browse/BEAM-557 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Scott Wegner >Assignee: Scott Wegner >Priority: Minor > Fix For: 0.3.0-incubating > > > Currently, guava-testlib is being relocated as part of the shading process, > but test-scope dependencies aren't bundled in the uber-jar. As a result, the > output JAR is unusable without recreating the same shading rules in a > consuming project. > Note that this does not effect our maven test process because tests are run > on the unshaded JAR. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-560) In JAXBCoder, use a pair of ThreadLocals to cache Marshaller/Unmarshaller
Thomas Groh created BEAM-560: Summary: In JAXBCoder, use a pair of ThreadLocals to cache Marshaller/Unmarshaller Key: BEAM-560 URL: https://issues.apache.org/jira/browse/BEAM-560 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Thomas Groh Assignee: Davor Bonaci Priority: Minor Marshallers and Unmarshallers are created per-element. Instead, they can be created per-thread and stored within a ThreadLocal which creates a new instance on calls to initialValue(), which allows them to be reused for all elements encoded or decoded by the coder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-493) All Runners Run WordCount in Precommit
[ https://issues.apache.org/jira/browse/BEAM-493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kuster updated BEAM-493: -- Summary: All Runners Run WordCount in Precommit (was: All Runners Run WordCount in Presubmit) > All Runners Run WordCount in Precommit > -- > > Key: BEAM-493 > URL: https://issues.apache.org/jira/browse/BEAM-493 > Project: Beam > Issue Type: Improvement >Reporter: Jason Kuster >Assignee: Jason Kuster > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-561) Add WindowedWordCountIT
[ https://issues.apache.org/jira/browse/BEAM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Kuster updated BEAM-561: -- Summary: Add WindowedWordCountIT (was: WindowedWordCountIT for Beam) > Add WindowedWordCountIT > --- > > Key: BEAM-561 > URL: https://issues.apache.org/jira/browse/BEAM-561 > Project: Beam > Issue Type: Bug >Reporter: Jason Kuster > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-561) WindowedWordCountIT for Beam
Jason Kuster created BEAM-561: - Summary: WindowedWordCountIT for Beam Key: BEAM-561 URL: https://issues.apache.org/jira/browse/BEAM-561 Project: Beam Issue Type: Bug Reporter: Jason Kuster -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #843: [BEAM-440] add Create#empty
GitHub user JasonMWhite opened a pull request: https://github.com/apache/incubator-beam/pull/843 [BEAM-440] add Create#empty Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/JasonMWhite/incubator-beam create_empty Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #843 commit d38b30204d1201b9146dd69bd312ec066314912d Author: Jason White Date: 2016-08-16T23:35:47Z add Create#empty --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-440) Create.values() returns a type-unsafe Coder
[ https://issues.apache.org/jira/browse/BEAM-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425458#comment-15425458 ] ASF GitHub Bot commented on BEAM-440: - GitHub user JasonMWhite opened a pull request: https://github.com/apache/incubator-beam/pull/843 [BEAM-440] add Create#empty Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/JasonMWhite/incubator-beam create_empty Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #843 commit d38b30204d1201b9146dd69bd312ec066314912d Author: Jason White Date: 2016-08-16T23:35:47Z add Create#empty > Create.values() returns a type-unsafe Coder > --- > > Key: BEAM-440 > URL: https://issues.apache.org/jira/browse/BEAM-440 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Daniel Halperin > Labels: newbie, starter > > Create.values() with no arguments will default to a VoidCoder, unless one is > set later with #setCoder(Coder). > Although it will encode its input correctly, this seems like a bad choice in > many cases. E.g., with Flatten: > PCollection> initial = p.apply("First", > Create.>of()); > PCollection> second = > p.apply("Second", Create.of("a", "b")).apply(ParDo.of(new > MyAvroDoFn())); > PCollectionList > .of(initial).and(second) > .apply(Flatten.>pCollections()); > This crashes trying to cast a KV from "Second" to a Void.class. > 1. Suggest throwing a warning in #getDefaultOutputCoder when defaulting to > VoidCoder for an empty elements list. Should this be an error? > 2. Suggest adding something like Create.empty(TypeDescriptor) to handle this > case properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-414) IntraBundleParallelization needs to be removed
[ https://issues.apache.org/jira/browse/BEAM-414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425499#comment-15425499 ] Scott Wegner commented on BEAM-414: --- Is the plan still to remove IntraBundleParallelization? I also noticed it does not support the new DoFn (only supports OldDoFn). If we don't plan on deprecating, we should update it to support the new DoFn. /cc [~kenn] > IntraBundleParallelization needs to be removed > -- > > Key: BEAM-414 > URL: https://issues.apache.org/jira/browse/BEAM-414 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Minor > Labels: newbie, starter > > IntraBundleParallelization needs to be removed because it does not work since > it breaks bundle processing semantics by expecting that context information > is not mutated by the runner between element processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #844: Update DoFn javadocs to remove references ...
GitHub user swegner opened a pull request: https://github.com/apache/incubator-beam/pull/844 Update DoFn javadocs to remove references to OldDoFn and Dataflow Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Various components had references to `OldDoFn` where it would be more appropriate to describe the "new" `DoFn`. Along the way, updated a few Dataflow references to Beam. You can merge this pull request into a Git repository by running: $ git pull https://github.com/swegner/incubator-beam dofn-javadoc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #844 commit dcab8c2a270f5b3448c3f8d5b8a28aeafb8352ac Author: Scott Wegner Date: 2016-08-17T21:38:36Z Update DoFn javadocs to remove references to OldDoFn and Dataflow --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature
Repository: incubator-beam Updated Branches: refs/heads/master 46097736b -> 89367cfb1 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java new file mode 100644 index 000..1a26df2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +import com.google.common.reflect.TypeToken; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.lang.reflect.Method; +import java.util.List; + +/** Tests for {@link DoFnSignatures}. */ +@RunWith(JUnit4.class) +public class DoFnSignaturesTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static class FakeDoFn extends DoFn {} + + @SuppressWarnings({"unused"}) + private void missingProcessContext() {} + + @Test + public void testMissingProcessContext() throws Exception { +thrown.expect(IllegalArgumentException.class); +thrown.expectMessage( +getClass().getName() ++ "#missingProcessContext() must take a ProcessContext<> as its first argument"); + +DoFnSignatures.analyzeProcessElementMethod( +TypeToken.of(FakeDoFn.class), +getClass().getDeclaredMethod("missingProcessContext"), +TypeToken.of(Integer.class), +TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badProcessContext(String s) {} + + @Test + public void testBadProcessContextType() throws Exception { +thrown.expect(IllegalArgumentException.class); +thrown.expectMessage( +getClass().getName() ++ "#badProcessContext(String) must take a ProcessContext<> as its first argument"); + +DoFnSignatures.analyzeProcessElementMethod( +TypeToken.of(FakeDoFn.class), +getClass().getDeclaredMethod("badProcessContext", String.class), +TypeToken.of(Integer.class), +TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badExtraContext(DoFn.Context c, int n) {} + + @Test + public void testBadExtraContext() throws Exception { +thrown.expect(IllegalArgumentException.class); +thrown.expectMessage( +getClass().getName() ++ "#badExtraContext(Context, int) must have a single argument of type Context"); + +DoFnSignatures.analyzeBundleMethod( +TypeToken.of(FakeDoFn.class), +getClass().getDeclaredMethod("badExtraContext", DoFn.Context.class, int.class), +TypeToken.of(Integer.class), +TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badExtraProcessContext(DoFn.ProcessContext c, Integer n) {} + + @Test + public void testBadExtraProcessContextType() throws Exception { +thrown.expect(IllegalArgumentException.class); +thrown.expectMessage( +"Integer is not a valid context parameter for method " ++ getClass().getName() ++ "#badExtraProcessContext(ProcessContext, Integer)" ++ ". Should be one of [BoundedWindow]"); + +DoFnSignatures.analyzeProcessElementMethod( +TypeToken.of(FakeDoFn.class), +getClass() +.getDeclaredMethod("badExtraProcessContext", DoFn.ProcessContext.class, Integer.class), +TypeToken.of(Integer.class), +TypeToken.of(String.class)); + } + + @SuppressWarnings("unused") + private int badReturnType() { +return 0; + } + + @Test + public void testBadReturnType() throws Exception { +thrown.expect(IllegalArgumentException.class); +thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void retu
[4/4] incubator-beam git commit: Closes #812
Closes #812 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89367cfb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89367cfb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89367cfb Branch: refs/heads/master Commit: 89367cfb19ae86d66441970277177512961d3b6a Parents: 4609773 fbf77f9 Author: bchambers Authored: Wed Aug 17 15:43:47 2016 -0700 Committer: bchambers Committed: Wed Aug 17 15:43:47 2016 -0700 -- .../org/apache/beam/sdk/transforms/DoFn.java| 17 +- .../beam/sdk/transforms/DoFnAdapters.java | 281 + .../beam/sdk/transforms/DoFnReflector.java | 1150 -- .../apache/beam/sdk/transforms/DoFnTester.java |2 +- .../org/apache/beam/sdk/transforms/ParDo.java |6 +- .../sdk/transforms/reflect/DoFnInvoker.java | 61 + .../sdk/transforms/reflect/DoFnInvokers.java| 506 .../sdk/transforms/reflect/DoFnSignature.java | 113 ++ .../sdk/transforms/reflect/DoFnSignatures.java | 321 + .../sdk/transforms/reflect/package-info.java| 23 + .../beam/sdk/transforms/DoFnReflectorTest.java | 822 - .../apache/beam/sdk/transforms/FlattenTest.java |4 +- .../dofnreflector/DoFnReflectorTestHelper.java | 116 -- .../transforms/reflect/DoFnInvokersTest.java| 498 .../reflect/DoFnInvokersTestHelper.java | 116 ++ .../transforms/reflect/DoFnSignaturesTest.java | 371 ++ .../transforms/DoFnInvokersBenchmark.java | 224 .../transforms/DoFnReflectorBenchmark.java | 232 18 files changed, 2529 insertions(+), 2334 deletions(-) --
[GitHub] incubator-beam pull request #812: Rewrites DoFnReflector to go via DoFnSigna...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/812 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java new file mode 100644 index 000..6730140 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +import com.google.auto.value.AutoValue; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra + * context it requires, types of the input and output elements, etc. + * + * See https://s.apache.org/a-new-dofn";>A new DoFn. + */ +@AutoValue +public abstract class DoFnSignature { + public abstract Class fnClass(); + + public abstract ProcessElementMethod processElement(); + + @Nullable + public abstract BundleMethod startBundle(); + + @Nullable + public abstract BundleMethod finishBundle(); + + @Nullable + public abstract LifecycleMethod setup(); + + @Nullable + public abstract LifecycleMethod teardown(); + + static DoFnSignature create( + Class fnClass, + ProcessElementMethod processElement, + @Nullable BundleMethod startBundle, + @Nullable BundleMethod finishBundle, + @Nullable LifecycleMethod setup, + @Nullable LifecycleMethod teardown) { +return new AutoValue_DoFnSignature( +fnClass, +processElement, +startBundle, +finishBundle, +setup, +teardown); + } + + /** Describes a {@link DoFn.ProcessElement} method. */ + @AutoValue + public abstract static class ProcessElementMethod { +enum Parameter { + BOUNDED_WINDOW, + INPUT_PROVIDER, + OUTPUT_RECEIVER +} + +public abstract Method targetMethod(); + +public abstract List extraParameters(); + +static ProcessElementMethod create(Method targetMethod, List extraParameters) { + return new AutoValue_DoFnSignature_ProcessElementMethod( + targetMethod, Collections.unmodifiableList(extraParameters)); +} + +/** @return true if the reflected {@link DoFn} uses a Single Window. */ +public boolean usesSingleWindow() { + return extraParameters().contains(Parameter.BOUNDED_WINDOW); +} + } + + /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */ + @AutoValue + public abstract static class BundleMethod { +public abstract Method targetMethod(); + +static BundleMethod create(Method targetMethod) { + return new AutoValue_DoFnSignature_BundleMethod(targetMethod); +} + } + + /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */ + @AutoValue + public abstract static class LifecycleMethod { +public abstract Method targetMethod(); + +static LifecycleMethod create(Method targetMethod) { + return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod); +} + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java new file mode 100644 index 000..80b3b4f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licen
[3/4] incubator-beam git commit: Rewrites DoFnReflector to go via DoFnSignature
Rewrites DoFnReflector to go via DoFnSignature DoFnSignature encapsulates type information about a DoFn, in particular which arguments/features its methods actually use. Before this commit, DoFnReflector would parse/verify/generate code in one go; after this commit, these stages are separated: DoFnSignature encapsulates all information needed to generate the code. Additionally, removes the unnecessary genericity in the implementation of DoFnReflector's code generation for the very different methods processElement and start/finishBundle. The code is simpler if decomposed into utility functions, rather than attempting a uniform representation for different methods. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbf77f90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbf77f90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbf77f90 Branch: refs/heads/master Commit: fbf77f90e0391304a580178f99441256526c4b0e Parents: 4609773 Author: Eugene Kirpichov Authored: Tue Aug 9 17:16:00 2016 -0700 Committer: bchambers Committed: Wed Aug 17 15:43:46 2016 -0700 -- .../org/apache/beam/sdk/transforms/DoFn.java| 17 +- .../beam/sdk/transforms/DoFnAdapters.java | 281 + .../beam/sdk/transforms/DoFnReflector.java | 1150 -- .../apache/beam/sdk/transforms/DoFnTester.java |2 +- .../org/apache/beam/sdk/transforms/ParDo.java |6 +- .../sdk/transforms/reflect/DoFnInvoker.java | 61 + .../sdk/transforms/reflect/DoFnInvokers.java| 506 .../sdk/transforms/reflect/DoFnSignature.java | 113 ++ .../sdk/transforms/reflect/DoFnSignatures.java | 321 + .../sdk/transforms/reflect/package-info.java| 23 + .../beam/sdk/transforms/DoFnReflectorTest.java | 822 - .../apache/beam/sdk/transforms/FlattenTest.java |4 +- .../dofnreflector/DoFnReflectorTestHelper.java | 116 -- .../transforms/reflect/DoFnInvokersTest.java| 498 .../reflect/DoFnInvokersTestHelper.java | 116 ++ .../transforms/reflect/DoFnSignaturesTest.java | 371 ++ .../transforms/DoFnInvokersBenchmark.java | 224 .../transforms/DoFnReflectorBenchmark.java | 232 18 files changed, 2529 insertions(+), 2334 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 80b67af..2348783 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollectionView; @@ -247,7 +248,7 @@ public abstract class DoFn implements Serializable, HasDisplayD / - Map> aggregators = new HashMap<>(); + protected Map> aggregators = new HashMap<>(); /** * Protects aggregators from being created after initialization. @@ -283,7 +284,7 @@ public abstract class DoFn implements Serializable, HasDisplayD /** * Interface for runner implementors to provide implementations of extra context information. * - * The methods on this interface are called by {@link DoFnReflector} before invoking an + * The methods on this interface are called by {@link DoFnInvoker} before invoking an * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that * has indicated it needs the given extra context. * @@ -301,23 +302,23 @@ public abstract class DoFn implements Serializable, HasDisplayD BoundedWindow window(); /** - * A placeholder for testing purposes. The return type itself is package-private and not - * implemented. + * A placeholder for testing purposes. */ InputProvider inputProvider(); /** - * A placeholder for testing purposes. The return type itself is package-private and not - * implemented. + * A placeholder for testing purposes. */ OutputReceiver outputReceiver(); } - static in
[2/2] incubator-beam git commit: Replace ParDo with simpler transforms where possible
Replace ParDo with simpler transforms where possible There are a number of places in the Java SDK where we use ParDo.of(DoFn) when MapElements or other higher-level composites are applicable and readable. This change alters a number of those. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/236945d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/236945d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/236945d2 Branch: refs/heads/master Commit: 236945d2504b73de91f7292219e0b15a53e062f5 Parents: 89367cf Author: Kenneth Knowles Authored: Wed Jul 27 14:23:15 2016 -0700 Committer: bchambers Committed: Wed Aug 17 16:09:01 2016 -0700 -- .../org/apache/beam/sdk/transforms/Combine.java | 28 ++-- .../org/apache/beam/sdk/transforms/Count.java | 8 +++--- .../beam/sdk/transforms/FlatMapElements.java| 4 +-- .../org/apache/beam/sdk/transforms/Flatten.java | 12 - .../org/apache/beam/sdk/transforms/Keys.java| 8 +++--- .../org/apache/beam/sdk/transforms/KvSwap.java | 9 +++ .../apache/beam/sdk/transforms/MapElements.java | 16 --- .../beam/sdk/transforms/RemoveDuplicates.java | 8 +++--- .../org/apache/beam/sdk/transforms/Values.java | 8 +++--- .../apache/beam/sdk/transforms/WithKeys.java| 9 +++ .../beam/sdk/transforms/windowing/Window.java | 11 .../java/org/apache/beam/sdk/PipelineTest.java | 12 - .../java/org/apache/beam/sdk/io/WriteTest.java | 4 ++- .../beam/sdk/transforms/MapElementsTest.java| 8 +++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 --- 15 files changed, 81 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 6ba3f8a..56c0bc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2121,14 +2121,14 @@ public class Combine { inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) .apply("PreCombineHot", Combine.perKey(hotPreCombine)) - .apply("StripNonce", ParDo.of( - new DoFn, AccumT>, - KV>>() { -@ProcessElement -public void processElement(ProcessContext c) { - c.output(KV.of( - c.element().getKey().getKey(), - InputOrAccum.accum(c.element().getValue(; + .apply("StripNonce", MapElements.via( + new SimpleFunction, AccumT>, + KV>>() { +@Override +public KV> apply(KV, AccumT> elem) { + return KV.of( + elem.getKey().getKey(), + InputOrAccum.accum(elem.getValue())); } })) .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)) @@ -2137,12 +2137,12 @@ public class Combine { PCollection>> preprocessedCold = split .get(cold) .setCoder(inputCoder) - .apply("PrepareCold", ParDo.of( - new DoFn, KV>>() { -@ProcessElement -public void processElement(ProcessContext c) { - c.output(KV.of(c.element().getKey(), - InputOrAccum.input(c.element().getValue(; + .apply("PrepareCold", MapElements.via( + new SimpleFunction, KV>>() { +@Override +public KV> apply(KV element) { + return KV.of(element.getKey(), + InputOrAccum.input(element.getValue())); } })) .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index ac59c76..195c5d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -107,10 +107,10 @@ public class
[1/2] incubator-beam git commit: Closes #756
Repository: incubator-beam Updated Branches: refs/heads/master 89367cfb1 -> d93ef2edd Closes #756 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d93ef2ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d93ef2ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d93ef2ed Branch: refs/heads/master Commit: d93ef2edd260a2077bc2ba6abd1ca02abd147a9a Parents: 89367cf 236945d Author: bchambers Authored: Wed Aug 17 16:09:01 2016 -0700 Committer: bchambers Committed: Wed Aug 17 16:09:01 2016 -0700 -- .../org/apache/beam/sdk/transforms/Combine.java | 28 ++-- .../org/apache/beam/sdk/transforms/Count.java | 8 +++--- .../beam/sdk/transforms/FlatMapElements.java| 4 +-- .../org/apache/beam/sdk/transforms/Flatten.java | 12 - .../org/apache/beam/sdk/transforms/Keys.java| 8 +++--- .../org/apache/beam/sdk/transforms/KvSwap.java | 9 +++ .../apache/beam/sdk/transforms/MapElements.java | 16 --- .../beam/sdk/transforms/RemoveDuplicates.java | 8 +++--- .../org/apache/beam/sdk/transforms/Values.java | 8 +++--- .../apache/beam/sdk/transforms/WithKeys.java| 9 +++ .../beam/sdk/transforms/windowing/Window.java | 11 .../java/org/apache/beam/sdk/PipelineTest.java | 12 - .../java/org/apache/beam/sdk/io/WriteTest.java | 4 ++- .../beam/sdk/transforms/MapElementsTest.java| 8 +++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 --- 15 files changed, 81 insertions(+), 74 deletions(-) --
[GitHub] incubator-beam pull request #756: [BEAM-498] Replace ParDo with MapElements ...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/756 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-498) Make DoFnWithContext the new DoFn
[ https://issues.apache.org/jira/browse/BEAM-498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425582#comment-15425582 ] ASF GitHub Bot commented on BEAM-498: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/756 > Make DoFnWithContext the new DoFn > - > > Key: BEAM-498 > URL: https://issues.apache.org/jira/browse/BEAM-498 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[2/4] incubator-beam git commit: addressed feedback
addressed feedback Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e088b7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e088b7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e088b7f Branch: refs/heads/master Commit: 0e088b7fcb2b35d7fdc5125d4dc66e9fa6ae7ffd Parents: da3081a Author: Pei He Authored: Wed Aug 17 13:56:37 2016 -0700 Committer: Luke Cwik Committed: Wed Aug 17 16:24:39 2016 -0700 -- .../beam/runners/dataflow/DataflowRunner.java | 6 -- .../runners/dataflow/DataflowRunnerTest.java| 21 +--- 2 files changed, 18 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6f8180e..1a845ea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -219,6 +219,9 @@ public class DataflowRunner extends PipelineRunner { // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024; + @VisibleForTesting + static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024; + private final Set> pcollectionsRequiringIndexedFormat; /** @@ -311,8 +314,7 @@ public class DataflowRunner extends PipelineRunner { } if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) { - dataflowOptions.setGcsUploadBufferSizeBytes( - AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT); + dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT); } return new DataflowRunner(dataflowOptions); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 6f1653b..58b9878 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -797,31 +797,38 @@ public class DataflowRunnerTest { } @Test - public void testGcsUploadBufferSizeDefault() throws IOException { + public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException { DataflowPipelineOptions batchOptions = buildPipelineOptions(); -DataflowRunner.fromOptions(batchOptions); +batchOptions.setRunner(DataflowRunner.class); +Pipeline.create(batchOptions); assertNull(batchOptions.getGcsUploadBufferSizeBytes()); + } + @Test + public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException { DataflowPipelineOptions streamingOptions = buildPipelineOptions(); streamingOptions.setStreaming(true); -DataflowRunner.fromOptions(streamingOptions); +streamingOptions.setRunner(DataflowRunner.class); +Pipeline.create(streamingOptions); assertEquals( -AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT, +DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT, streamingOptions.getGcsUploadBufferSizeBytes().intValue()); } @Test - public void testGcsUploadBufferSize() throws IOException { + public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException { int gcsUploadBufferSizeBytes = 12345678; DataflowPipelineOptions batchOptions = buildPipelineOptions(); batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); -DataflowRunner.fromOptions(batchOptions); +batchOptions.setRunner(DataflowRunner.class); +Pipeline.create(batchOptions); assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue()); DataflowPipelineOptions streamingOptions = buildPipelineOptions(); streamingOptions.setStreaming(true); streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); -D
[1/4] incubator-beam git commit: fix unused imports
Repository: incubator-beam Updated Branches: refs/heads/master d93ef2edd -> a07648bb6 fix unused imports Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9ff2e42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9ff2e42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9ff2e42 Branch: refs/heads/master Commit: d9ff2e42339e04358c66308bd292a5a460547f77 Parents: 0e088b7 Author: Pei He Authored: Wed Aug 17 14:30:23 2016 -0700 Committer: Luke Cwik Committed: Wed Aug 17 16:24:39 2016 -0700 -- .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 1 - .../java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 1 - 2 files changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9ff2e42/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1a845ea..c4dd703 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -125,7 +125,6 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; import com.google.api.services.dataflow.model.WorkerPool; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9ff2e42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 58b9878..92a6bcb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -89,7 +89,6 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap;
[4/4] incubator-beam git commit: [BEAM-554] Set Gcs upload buffer size to 1M in DataflowRunner streaming mode
[BEAM-554] Set Gcs upload buffer size to 1M in DataflowRunner streaming mode This closes #828 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a07648bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a07648bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a07648bb Branch: refs/heads/master Commit: a07648bb6609ec7ca52721ae2d4a1b8f6ecdba71 Parents: d93ef2e d9ff2e4 Author: Luke Cwik Authored: Wed Aug 17 16:24:50 2016 -0700 Committer: Luke Cwik Committed: Wed Aug 17 16:24:50 2016 -0700 -- .../beam/runners/dataflow/DataflowRunner.java | 7 .../runners/dataflow/DataflowRunnerTest.java| 37 2 files changed, 44 insertions(+) --
[3/4] incubator-beam git commit: Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner
Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da3081a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da3081a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da3081a6 Branch: refs/heads/master Commit: da3081a68c82c6f22ee382dfe0ffe1bd6be5d0e2 Parents: d93ef2e Author: Pei He Authored: Mon Aug 15 12:22:11 2016 -0700 Committer: Luke Cwik Committed: Wed Aug 17 16:24:39 2016 -0700 -- .../beam/runners/dataflow/DataflowRunner.java | 6 .../runners/dataflow/DataflowRunnerTest.java| 31 2 files changed, 37 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 689..6f8180e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -125,6 +125,7 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -309,6 +310,11 @@ public class DataflowRunner extends PipelineRunner { + "' invalid. Please make sure the value is non-negative."); } +if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) { + dataflowOptions.setGcsUploadBufferSizeBytes( + AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT); +} + return new DataflowRunner(dataflowOptions); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index d7deffd..6f1653b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -89,6 +89,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -795,6 +796,36 @@ public class DataflowRunnerTest { } } + @Test + public void testGcsUploadBufferSizeDefault() throws IOException { +DataflowPipelineOptions batchOptions = buildPipelineOptions(); +DataflowRunner.fromOptions(batchOptions); +assertNull(batchOptions.getGcsUploadBufferSizeBytes()); + +DataflowPipelineOptions streamingOptions = buildPipelineOptions(); +streamingOptions.setStreaming(true); +DataflowRunner.fromOptions(streamingOptions); +assertEquals( +AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT, +streamingOptions.getGcsUploadBufferSizeBytes().intValue()); + } + + @Test + public void testGcsUploadBufferSize() throws IOException { +int gcsUploadBufferSizeBytes = 12345678; +DataflowPipelineOptions batchOptions = buildPipelineOptions(); +batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); +DataflowRunner.fromOptions(batchOptions); +assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue()); + +DataflowPipelineOptions streamingOptions = buildPipelineOptions(); +streamingOptions.setStreaming(true); +streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); +DataflowRunner.fromOptio
[GitHub] incubator-beam pull request #828: [BEAM-554] Set Gcs upload buffer size to 1...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/828 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-554) Dataflow runner to support bounded writes in streaming mode.
[ https://issues.apache.org/jira/browse/BEAM-554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425588#comment-15425588 ] ASF GitHub Bot commented on BEAM-554: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/828 > Dataflow runner to support bounded writes in streaming mode. > > > Key: BEAM-554 > URL: https://issues.apache.org/jira/browse/BEAM-554 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Pei He >Assignee: Pei He > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-562) DoFn Reuse: Add new methods to DoFn
Ahmet Altay created BEAM-562: Summary: DoFn Reuse: Add new methods to DoFn Key: BEAM-562 URL: https://issues.apache.org/jira/browse/BEAM-562 Project: Beam Issue Type: New Feature Components: sdk-py Reporter: Ahmet Altay Assignee: Ahmet Altay Java SDK added setup and teardown methods to the DoFns. This makes DoFns reusable and provide performance improvements. Python SDK should add support for these new DoFn methods: Proposal doc: https://docs.google.com/document/d/1LLQqggSePURt3XavKBGV7SZJYQ4NW8yCu63lBchzMRk/edit?ts=5771458f# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-414) IntraBundleParallelization needs to be removed
[ https://issues.apache.org/jira/browse/BEAM-414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425597#comment-15425597 ] Luke Cwik commented on BEAM-414: [~pei...@gmail.com] was working on some refactoring of the examples which depended on IntraBundleParallelization. Once that went in we could get rid of IntraBundleParallelization. maven-archetypes/examples seems to still need to be updated with the examples refactoring work that [~pei...@gmail.com] has been working and PubSubFileInjector needs to be deleted. After that we should be able to remove IntraBundleParallelization. > IntraBundleParallelization needs to be removed > -- > > Key: BEAM-414 > URL: https://issues.apache.org/jira/browse/BEAM-414 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Minor > Labels: newbie, starter > > IntraBundleParallelization needs to be removed because it does not work since > it breaks bundle processing semantics by expecting that context information > is not mutated by the runner between element processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-563) DoFn Reuse: Update DirectRunner
Ahmet Altay created BEAM-563: Summary: DoFn Reuse: Update DirectRunner Key: BEAM-563 URL: https://issues.apache.org/jira/browse/BEAM-563 Project: Beam Issue Type: New Feature Components: sdk-py Reporter: Ahmet Altay Assignee: Ahmet Altay https://issues.apache.org/jira/browse/BEAM-562 will add setup and teardown methods to DoFns. Update DirectRunner to add support for these new methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (BEAM-153) Support timeout in runner API
[ https://issues.apache.org/jira/browse/BEAM-153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He reassigned BEAM-153: --- Assignee: Pei He > Support timeout in runner API > - > > Key: BEAM-153 > URL: https://issues.apache.org/jira/browse/BEAM-153 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Pei He > > Some users want to make sure that their pipeline doesn't run longer than X > minutes (e.g. because sometimes it runs longer than that due to bugs, and in > that case they'd rather auto-cancel it than incur the costs). > The runner API should have a timeout option, so that if the pipeline isn't in > a terminal state by then, it is automatically cancelled. > Naturally, this only applies to batch pipelines. > A simple way to implement this for a blocking runner (such as > BlockingDataflowPipelineRunner) is a wrapper of the sort "start pipeline, and > cancel it after timeout" inside run(). For a non-blocking runner this will > require support on the underlying execution environment side. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-564) Update source framework so that remaining and consumed number of split points can be reported
Chamikara Jayalath created BEAM-564: --- Summary: Update source framework so that remaining and consumed number of split points can be reported Key: BEAM-564 URL: https://issues.apache.org/jira/browse/BEAM-564 Project: Beam Issue Type: Improvement Components: sdk-py Reporter: Chamikara Jayalath Assignee: Chamikara Jayalath We have to update Python SDK source framework so that sources can report consumed and remaining number of split points. Runners can use this information to determine how many times a given source can be split into and parallelize reading accordingly. Corresponding API for JAVA SDK is here: https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L258 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-beam git commit: Closes #833
Repository: incubator-beam Updated Branches: refs/heads/master a07648bb6 -> cf056f992 Closes #833 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf056f99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf056f99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf056f99 Branch: refs/heads/master Commit: cf056f9925a25f606b4d83b98f8182d4f31dfa95 Parents: a07648b a0361ae Author: Dan Halperin Authored: Wed Aug 17 17:45:05 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:45:05 2016 -0700 -- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 376 +++ .../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 88 +++-- 2 files changed, 195 insertions(+), 269 deletions(-) --
[2/2] incubator-beam git commit: DatastoreIO Sink as ParDo
DatastoreIO Sink as ParDo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0361ae9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0361ae9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0361ae9 Branch: refs/heads/master Commit: a0361ae99e9e39bb5ff9766508501932416129ec Parents: a07648b Author: Vikas Kedigehalli Authored: Mon Aug 15 15:28:07 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:45:05 2016 -0700 -- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 376 +++ .../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 88 +++-- 2 files changed, 195 insertions(+), 269 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index 052feb3..0d2e2cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -30,10 +30,6 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.Sink.WriteOperation; -import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; @@ -167,7 +163,8 @@ public class V1Beta3 { * Datastore has a limit of 500 mutations per batch operation, so we flush * changes to Datastore every 500 entities. */ - private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; + @VisibleForTesting + static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; /** * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId}, @@ -634,42 +631,8 @@ public class V1Beta3 { } } } - -/** - * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and - * {@link QuerySplitter} - * - * {@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence - * wrapping them under this class, which implements {@link Serializable}. - */ -@VisibleForTesting -static class V1Beta3DatastoreFactory implements Serializable { - - /** Builds a Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { -DatastoreOptions.Builder builder = -new DatastoreOptions.Builder() -.projectId(projectId) -.initializer( -new RetryHttpRequestInitializer() -); - -Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); -if (credential != null) { - builder.credential(credential); -} - -return DatastoreFactory.get().create(builder.build()); - } - - /** Builds a Datastore {@link QuerySplitter}. */ - public QuerySplitter getQuerySplitter() { -return DatastoreHelper.getQuerySplitter(); - } -} } - /** * Returns an empty {@link V1Beta3.Write} builder. Configure the destination * {@code projectId} using {@link V1Beta3.Write#withProjectId}. @@ -705,8 +668,8 @@ public class V1Beta3 { @Override public PDone apply(PCollection input) { - return input.apply( - org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId))); + input.apply(ParDo.of(new DatastoreWriterFn(projectId))); + return PDone.in(input.getPipeline()); } @Override @@ -733,130 +696,127 @@ public class V1Beta3 { .addIfNotNull(DisplayData.item("projectId", projectId) .withLabel("Output Project")); } - } - /** - * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore. - */ - static class DatastoreSink extends org.apache.beam.sdk.io.Sink { -final String projectId; - -public DatastoreSink(String projectId) { - this.projectId = projectId; -} - -@Override -public void validate(PipelineOptions options) { - checkNotNull(projectId, "projectId"); -} - -@Override -
[GitHub] incubator-beam pull request #833: [BEAM-550] DatastoreIO Sink as ParDo
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/833 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-550) Datastore should support writes for Unbounded PCollections
[ https://issues.apache.org/jira/browse/BEAM-550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425670#comment-15425670 ] ASF GitHub Bot commented on BEAM-550: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/833 > Datastore should support writes for Unbounded PCollections > --- > > Key: BEAM-550 > URL: https://issues.apache.org/jira/browse/BEAM-550 > Project: Beam > Issue Type: Bug >Reporter: Vikas Kedigehalli >Assignee: Vikas Kedigehalli > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-beam git commit: Change name of result returned by BigQueryIO.Read
Repository: incubator-beam Updated Branches: refs/heads/master cf056f992 -> 7ac8d6ded Change name of result returned by BigQueryIO.Read Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/214776e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/214776e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/214776e5 Branch: refs/heads/master Commit: 214776e5f06da91b808c51bd3ae69a3811c30cef Parents: cf056f9 Author: Frank Yellin Authored: Tue Aug 16 14:40:41 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:47:26 2016 -0700 -- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/214776e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index aa168bd..ce04467 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -165,7 +165,7 @@ import javax.annotation.Nullable; * To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. * This produces a {@link PCollection} of {@link TableRow TableRows} as output: * {@code - * PCollection shakespeare = pipeline.apply( + * PCollection weatherData = pipeline.apply( * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations")); * } * @@ -176,7 +176,7 @@ import javax.annotation.Nullable; * input transform. * * {@code - * PCollection shakespeare = pipeline.apply( + * PCollection meanTemperatureData = pipeline.apply( * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]")); * } *
[GitHub] incubator-beam pull request #840: [Beam-555] Remove cut-and-paste error from...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/840 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #840
Closes #840 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ac8d6de Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ac8d6de Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ac8d6de Branch: refs/heads/master Commit: 7ac8d6ded34bc5f46a1ddd350a00da122693d648 Parents: cf056f9 214776e Author: Dan Halperin Authored: Wed Aug 17 17:47:27 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:47:27 2016 -0700 -- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[2/2] incubator-beam git commit: Closes #836
Closes #836 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e39bf3ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e39bf3ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e39bf3ff Branch: refs/heads/python-sdk Commit: e39bf3ff4df86f8f27467cf8b644db1d4ea5efe5 Parents: 6006848 44145bb Author: Dan Halperin Authored: Wed Aug 17 17:48:48 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:48:48 2016 -0700 -- pom.xml | 1 + sdks/python/setup.cfg | 2 -- sdks/python/tox.ini | 3 --- 3 files changed, 1 insertion(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Remove egg_info from setup.cfg
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 600684865 -> e39bf3ff4 Remove egg_info from setup.cfg Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44145bbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44145bbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44145bbe Branch: refs/heads/python-sdk Commit: 44145bbe02d6cea5c8ecdf7bdcd203f1e54b293e Parents: 6006848 Author: Ahmet Altay Authored: Tue Aug 16 09:53:00 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:48:47 2016 -0700 -- pom.xml | 1 + sdks/python/setup.cfg | 2 -- sdks/python/tox.ini | 3 --- 3 files changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44145bbe/pom.xml -- diff --git a/pom.xml b/pom.xml index afe24ee..d509167 100644 --- a/pom.xml +++ b/pom.xml @@ -795,6 +795,7 @@ **/test/**/.placeholder .repository/**/* **/nose-*.egg/**/* + **/.tox/**/* **/.checkstyle http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44145bbe/sdks/python/setup.cfg -- diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg index fcfe003..547a74b 100644 --- a/sdks/python/setup.cfg +++ b/sdks/python/setup.cfg @@ -26,5 +26,3 @@ verbosity=2 # fast_coders_test and typecoders_test. exclude=fast_coders_test|typecoders_test -[egg_info] -egg_base = target http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44145bbe/sdks/python/tox.ini -- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index cba9626..5a2572e 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -17,9 +17,6 @@ [tox] envlist = py27 -toxworkdir={toxinidir}/target/tox -distdir={toxinidir}/target/dist -distshare={toxinidir}/target/distshare [pep8] # Disable all errors and warnings except for the ones related to blank lines.
[GitHub] incubator-beam pull request #835: Fix NPE in BigQueryIO.TransformingReader w...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/835 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Closes #835
Repository: incubator-beam Updated Branches: refs/heads/master 7ac8d6ded -> bfa3b70ab Closes #835 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa3b70a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa3b70a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa3b70a Branch: refs/heads/master Commit: bfa3b70ab63c730a320d825ab9f2f93fee748a1c Parents: 7ac8d6d 2c8a654 Author: Dan Halperin Authored: Wed Aug 17 17:50:07 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:50:07 2016 -0700 -- .../beam/sdk/testing/SourceTestUtils.java | 132 +++ .../beam/sdk/testing/SourceTestUtilsTest.java | 66 ++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 12 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 31 + 4 files changed, 235 insertions(+), 6 deletions(-) --
[2/2] incubator-beam git commit: Fix NPE in BigQueryIO.TransformingReader
Fix NPE in BigQueryIO.TransformingReader Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8a6546 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8a6546 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8a6546 Branch: refs/heads/master Commit: 2c8a6546af2adb1f7694f29a092338898f851d16 Parents: 7ac8d6d Author: Pei He Authored: Mon Aug 15 17:23:20 2016 -0700 Committer: Dan Halperin Committed: Wed Aug 17 17:50:07 2016 -0700 -- .../beam/sdk/testing/SourceTestUtils.java | 132 +++ .../beam/sdk/testing/SourceTestUtilsTest.java | 66 ++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 12 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 31 + 4 files changed, 235 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index e0b8890..9ce9c5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static com.google.common.base.Preconditions.checkNotNull; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -27,10 +29,15 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -45,6 +53,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import javax.annotation.Nullable; + /** * Helper functions and test harnesses for checking correctness of {@link Source} * implementations. @@ -673,4 +683,126 @@ public class SourceTestUtils { numItemsToReadBeforeSplitting, fraction, options); return (res.numResidualItems > 0); } + + /** + * Returns an equivalent unsplittable {@code BoundedSource}. + * + * It forwards most methods to the given {@code boundedSource}, except: + * + * {@link BoundedSource#splitIntoBundles} rejects initial splitting + * by returning itself in a list. + * {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null. + * + */ + public static BoundedSource toUnsplittableSource(BoundedSource boundedSource) { +return new UnsplittableSource<>(boundedSource); + } + + private static class UnsplittableSource extends BoundedSource { + +private final BoundedSource boundedSource; + +private UnsplittableSource(BoundedSource boundedSource) { + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); +} + +@Override +public void populateDisplayData(DisplayData.Builder builder) { + this.boundedSource.populateDisplayData(builder); +} + +@Override +public List> splitIntoBundles( +long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return ImmutableList.of(this); +} + +@Override +public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return boundedSource.getEstimatedSizeBytes(options); +} + +@Override +public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return boundedSource.producesSortedKeys(options); +} + +@Override +public BoundedReader createReader(PipelineOptions options) throws IOException { + return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options)); +} + +@Override +public void validate() { + boundedSource.validate(); +} + +@Override +public Coder getDefaultOutputCoder() { + return bou
[GitHub] incubator-beam pull request #836: Remove egg_info from setup.cfg
Github user aaltay closed the pull request at: https://github.com/apache/incubator-beam/pull/836 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (BEAM-550) Datastore should support writes for Unbounded PCollections
[ https://issues.apache.org/jira/browse/BEAM-550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Kedigehalli resolved BEAM-550. Resolution: Fixed Fix Version/s: 0.3.0-incubating > Datastore should support writes for Unbounded PCollections > --- > > Key: BEAM-550 > URL: https://issues.apache.org/jira/browse/BEAM-550 > Project: Beam > Issue Type: Bug >Reporter: Vikas Kedigehalli >Assignee: Vikas Kedigehalli > Fix For: 0.3.0-incubating > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-565) Datastore Sink should support deletes
Vikas Kedigehalli created BEAM-565: -- Summary: Datastore Sink should support deletes Key: BEAM-565 URL: https://issues.apache.org/jira/browse/BEAM-565 Project: Beam Issue Type: Improvement Reporter: Vikas Kedigehalli Assignee: Vikas Kedigehalli -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-565) Datastore Sink should support deletes
[ https://issues.apache.org/jira/browse/BEAM-565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425788#comment-15425788 ] ASF GitHub Bot commented on BEAM-565: - GitHub user vikkyrk opened a pull request: https://github.com/apache/incubator-beam/pull/845 [BEAM-565] Datastore Sink support for writing Mutations Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/vikkyrk/incubator-beam vikasrk/ds_mutation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #845 commit ffcb014a5ad2c47585d48c7038f82c9c0530d726 Author: Vikas Kedigehalli Date: 2016-08-18T01:19:52Z Datastore Sink support for writing Mutations > Datastore Sink should support deletes > - > > Key: BEAM-565 > URL: https://issues.apache.org/jira/browse/BEAM-565 > Project: Beam > Issue Type: Improvement >Reporter: Vikas Kedigehalli >Assignee: Vikas Kedigehalli > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #845: [BEAM-565] Datastore Sink support for writ...
GitHub user vikkyrk opened a pull request: https://github.com/apache/incubator-beam/pull/845 [BEAM-565] Datastore Sink support for writing Mutations Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/vikkyrk/incubator-beam vikasrk/ds_mutation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #845 commit ffcb014a5ad2c47585d48c7038f82c9c0530d726 Author: Vikas Kedigehalli Date: 2016-08-18T01:19:52Z Datastore Sink support for writing Mutations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---