[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=90921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90921
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 13/Apr/18 18:15
Start Date: 13/Apr/18 18:15
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5faf95f8853..04147ba9bf5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -231,6 +231,7 @@
 org.apache.beam.sdk.testing.UsesSplittableParDo,
 org.apache.beam.sdk.testing.UsesAttemptedMetrics,
 org.apache.beam.sdk.testing.UsesCommittedMetrics,
+org.apache.beam.sdk.testing.UsesImpulse,
 org.apache.beam.sdk.testing.UsesTestStream
   
   none
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
index 40e08360087..128b826ecb0 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
@@ -64,6 +64,8 @@ private static PTransformMatcher boundedMatcher() {
 ReadTranslation.sourceIsBounded(transform) == 
PCollection.IsBounded.BOUNDED);
   }
 
+  // TODO: https://issues.apache.org/jira/browse/BEAM-3859 Support unbounded 
reads via impulse.
+
   private static class BoundedReadViaImpulse extends PTransform {
 private final BoundedSource source;
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 35e079151bd..abbba2a91a9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -70,13 +70,13 @@ private GreedyPipelineFuser(Pipeline p) {
   }
 
   /**
-   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage 
ExecutableStages}.
*
* This fuser expects each ExecutableStage to have exactly one input. 
This means that pipelines
* must be rooted at Impulse, or other runner-executed primitive transforms, 
instead of primitive
* Read nodes. The utilities in
-   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
-   * non-compliant pipelines.
+   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to convert
+   * bounded pipelines using the Read primitive.
*/
   public static FusedPipeline fuse(Pipeline p) {
 GreedyPipelineFuser fuser = new GreedyPipelineFuser(p);
diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index d6673ae246d..b633f7c060a 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -104,6 +104,7 @@ def createValidatesRunnerTask(Map m) {
 excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
 excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
 excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
 excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
   }
 } else {
@@ -111,8 +112,8 @@ def createValidatesRunnerTask(Map m) {
 includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
 excludeCategories 
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
 excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
-excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
 excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDo'
 excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
   }
 }
diff --git 

[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=90889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90889
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 13/Apr/18 16:54
Start Date: 13/Apr/18 16:54
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#issuecomment-381196752
 
 
   It looks like the precommits that are actually being run are passing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90889)
Time Spent: 4h 40m  (was: 4.5h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=90617=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90617
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 12/Apr/18 22:42
Start Date: 12/Apr/18 22:42
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#issuecomment-380966483
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90617)
Time Spent: 4.5h  (was: 4h 20m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=90231=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90231
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 11/Apr/18 23:56
Start Date: 11/Apr/18 23:56
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#issuecomment-380631588
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90231)
Time Spent: 4h 20m  (was: 4h 10m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86827
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:57
Start Date: 02/Apr/18 22:57
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#issuecomment-378072228
 
 
   Review comments addressed. PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86827)
Time Spent: 4h 10m  (was: 4h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86825=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86825
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178669947
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
 ##
 @@ -89,14 +96,50 @@ public void testSplitSourceFn() {
   public void testReadFromSourceFn() {
 BoundedSource source = CountingSource.upTo(10L);
 PCollection sourcePC =
-(PCollection)
-p.apply(Create.of(source).withCoder(SerializableCoder.of((Class) 
BoundedSource.class)));
-PCollection elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()));
+  p.apply(Create.of(source)
+  .withCoder(new JavaReadViaImpulse.BoundedSourceCoder<>()));
+PCollection elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()))
+.setCoder(VarLongCoder.of());
 
 PAssert.that(elems).containsInAnyOrder(0L, 9L, 8L, 1L, 2L, 7L, 6L, 3L, 4L, 
5L);
 p.run();
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadToImpulseOverride() {
+BoundedSource source = CountingSource.upTo(10L);
+// Use an explicit read transform to ensure the override is exercised.
+PCollection input = p.apply(Read.from(source));
+PAssert.that(input).containsInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+p.traverseTopologically(new Pipeline.PipelineVisitor() {
 
 Review comment:
   This is actually already in master and was just one of the dependent changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86825)
Time Spent: 3h 50m  (was: 3h 40m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86824=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86824
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178670179
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/ImpulseInputFormat.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+/** Flink input format that implements impulses. */
+public class ImpulseInputFormat extends RichInputFormat, 
GenericInputSplit> {
+
+  // Whether the input format has remaining output that has not yet been read.
+  private boolean availableOutput = false;
+
+  public ImpulseInputFormat() {}
+
+  @Override
+  public void configure(Configuration configuration) {
+// Do nothing.
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
+return new BaseStatistics() {
+  @Override
+  public long getTotalInputSize() {
+return 1;
+  }
+
+  @Override
+  public long getNumberOfRecords() {
+return 1;
+  }
+
+  @Override
+  public float getAverageRecordWidth() {
+return 1;
+  }
+};
+  }
+
+  @Override
+  public GenericInputSplit[] createInputSplits(int numSplits) {
+// Always return a single split because only one global "impulse" will 
ever be sent.
+return new GenericInputSplit[]{new GenericInputSplit(1, 1)};
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
genericInputSplits) {
+return new DefaultInputSplitAssigner(genericInputSplits);
+  }
+
+  @Override
+  public void open(GenericInputSplit genericInputSplit) {
+availableOutput = true;
+  }
+
+  @Override
+  public boolean reachedEnd() {
+return !availableOutput;
+  }
+
+  @Override
+  public WindowedValue nextRecord(WindowedValue windowedValue) 
{
+availableOutput = false;
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86824)
Time Spent: 3h 40m  (was: 3.5h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86822
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178669196
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
 fusePipeline(groupSiblings(rootConsumers));
   }
 
+  /**
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86822)
Time Spent: 3h 20m  (was: 3h 10m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86826=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86826
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178670010
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
 ##
 @@ -106,6 +149,13 @@ public void testOutputCoder() {
 equalTo(BigEndianIntegerCoder.of()));
   }
 
+  private static void assertNotReadTransform(PTransform transform) {
 
 Review comment:
   As above, this is already submitted, though I could change it here if you 
want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86826)
Time Spent: 4h  (was: 3h 50m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86821
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178669051
 
 

 ##
 File path: sdks/java/core/build.gradle
 ##
 @@ -66,6 +66,7 @@ dependencies {
   shadow library.java.junit
   shadow "org.tukaani:xz:1.5"
   shadowTest project(":model:fn-execution").sourceSets.test.output
+  shadowTest project(path: ":runners:core-construction-java", configuration: 
"shadow")
 
 Review comment:
   This isn't needed anymore. Thanks for pointing it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86821)
Time Spent: 3h 20m  (was: 3h 10m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=86823=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86823
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:44
Start Date: 02/Apr/18 22:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r178669579
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
 fusePipeline(groupSiblings(rootConsumers));
   }
 
+  /**
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+   *
+   * This fuser expects each ExecutableStage to have exactly one input. 
This means that pipelines
+   * must be rooted at Impulse, or other runner-executed primitive transforms, 
instead of primitive
+   * Read nodes. The utilities in
+   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
+   * non-compliant pipelines.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86823)
Time Spent: 3.5h  (was: 3h 20m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80946
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174896008
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/ImpulseInputFormat.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+/** Flink input format that implements impulses. */
+public class ImpulseInputFormat extends RichInputFormat, 
GenericInputSplit> {
+
+  // Whether the input format has remaining output that has not yet been read.
+  private boolean availableOutput = false;
+
+  public ImpulseInputFormat() {}
+
+  @Override
+  public void configure(Configuration configuration) {
+// Do nothing.
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
+return new BaseStatistics() {
+  @Override
+  public long getTotalInputSize() {
+return 1;
+  }
+
+  @Override
+  public long getNumberOfRecords() {
+return 1;
+  }
+
+  @Override
+  public float getAverageRecordWidth() {
+return 1;
+  }
+};
+  }
+
+  @Override
+  public GenericInputSplit[] createInputSplits(int numSplits) {
+// Always return a single split because only one global "impulse" will 
ever be sent.
+return new GenericInputSplit[]{new GenericInputSplit(1, 1)};
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
genericInputSplits) {
+return new DefaultInputSplitAssigner(genericInputSplits);
+  }
+
+  @Override
+  public void open(GenericInputSplit genericInputSplit) {
+availableOutput = true;
+  }
+
+  @Override
+  public boolean reachedEnd() {
+return !availableOutput;
+  }
+
+  @Override
+  public WindowedValue nextRecord(WindowedValue windowedValue) 
{
+availableOutput = false;
 
 Review comment:
   This could be a `checkArgument` before you mutate the state (minor 
preference, no strong feelings)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80946)
Time Spent: 3h 10m  (was: 3h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80939
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r173033660
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -72,10 +72,10 @@ private GreedyPipelineFuser(Pipeline p) {
   /**
* Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
*
-   * This fuser expects each PTransform to have exactly one input. This 
means that pipelines must
-   * use Impulse/ParDo transformations rather than read nodes. The utilities in
-   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
-   * non-compliant pipelines.
+   * This fuser expects each PTransform which has no inputs to have an 
associated environment.
 
 Review comment:
   s/an associated environment/no associated environment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80939)
Time Spent: 2.5h  (was: 2h 20m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80945
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174896407
 
 

 ##
 File path: sdks/java/core/build.gradle
 ##
 @@ -66,6 +66,7 @@ dependencies {
   shadow library.java.junit
   shadow "org.tukaani:xz:1.5"
   shadowTest project(":model:fn-execution").sourceSets.test.output
+  shadowTest project(path: ":runners:core-construction-java", configuration: 
"shadow")
 
 Review comment:
   What introduced this edge?
   
   This also can't be represented in maven, and right now that worries me 
(though hopefully less soon)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80945)
Time Spent: 3h  (was: 2h 50m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80944
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174893743
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
 fusePipeline(groupSiblings(rootConsumers));
   }
 
+  /**
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
+   *
+   * This fuser expects each ExecutableStage to have exactly one input. 
This means that pipelines
+   * must be rooted at Impulse, or other runner-executed primitive transforms, 
instead of primitive
+   * Read nodes. The utilities in
+   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
+   * non-compliant pipelines.
 
 Review comment:
   This does kind of have an associated `TODO` for unbounded reads; 
https://issues.apache.org/jira/browse/BEAM-3859 is the (just-authored) issue to 
link against.
   
   'can be used to translate non-compliant pipelines -> can be used to convert 
bounded pipelines using the `Read` primitive.'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80944)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80941
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174893625
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -69,6 +69,15 @@ private GreedyPipelineFuser(Pipeline p) {
 fusePipeline(groupSiblings(rootConsumers));
   }
 
+  /**
+   * Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
 
 Review comment:
   `{@link ExecutableStage ExecutableStages}` is our normal style.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80941)
Time Spent: 2h 40m  (was: 2.5h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80940
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r173033901
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
 ##
 @@ -72,10 +72,10 @@ private GreedyPipelineFuser(Pipeline p) {
   /**
* Fuses a {@link Pipeline} into a collection of {@link ExecutableStage}s.
*
-   * This fuser expects each PTransform to have exactly one input. This 
means that pipelines must
-   * use Impulse/ParDo transformations rather than read nodes. The utilities in
-   * {@link org.apache.beam.runners.core.construction.JavaReadViaImpulse} can 
be used to translate
-   * non-compliant pipelines.
+   * This fuser expects each PTransform which has no inputs to have an 
associated environment.
+   * This means that pipelines must use Impulse/ParDo transformations rather 
than read nodes. The
 
 Review comment:
   "This means that pipelines must be rooted at Impulse, or other 
runner-executed primitive transforms, instead of primitive Read nodes."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80940)
Time Spent: 2h 40m  (was: 2.5h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80942=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80942
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174895596
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
 ##
 @@ -89,14 +96,50 @@ public void testSplitSourceFn() {
   public void testReadFromSourceFn() {
 BoundedSource source = CountingSource.upTo(10L);
 PCollection sourcePC =
-(PCollection)
-p.apply(Create.of(source).withCoder(SerializableCoder.of((Class) 
BoundedSource.class)));
-PCollection elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()));
+  p.apply(Create.of(source)
+  .withCoder(new JavaReadViaImpulse.BoundedSourceCoder<>()));
+PCollection elems = sourcePC.apply(ParDo.of(new 
ReadFromBoundedSourceFn<>()))
+.setCoder(VarLongCoder.of());
 
 PAssert.that(elems).containsInAnyOrder(0L, 9L, 8L, 1L, 2L, 7L, 6L, 3L, 4L, 
5L);
 p.run();
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadToImpulseOverride() {
+BoundedSource source = CountingSource.upTo(10L);
+// Use an explicit read transform to ensure the override is exercised.
+PCollection input = p.apply(Read.from(source));
+PAssert.that(input).containsInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 
9L);
+
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+p.traverseTopologically(new Pipeline.PipelineVisitor() {
 
 Review comment:
   extend PipelineVisitor.Defaults, and get rid of `enterPipeline`, 
`visitValue`, and `leave*`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80942)
Time Spent: 2h 50m  (was: 2h 40m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80943
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r174895498
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/JavaReadViaImpulseTest.java
 ##
 @@ -106,6 +149,13 @@ public void testOutputCoder() {
 equalTo(BigEndianIntegerCoder.of()));
   }
 
+  private static void assertNotReadTransform(PTransform transform) {
 
 Review comment:
   This could be inlined into the anonymous subclass that you'er traversing the 
pipeline with


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80943)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80938=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80938
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 19:01
Start Date: 15/Mar/18 19:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4783: 
[BEAM-2898] Support Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#discussion_r173033558
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/JavaReadViaImpulse.java
 ##
 @@ -143,9 +143,10 @@ public void readSoruce(ProcessContext ctxt) throws 
IOException {
* A {@link Coder} for {@link BoundedSource}s that wraps a {@link 
SerializableCoder}. We cannot
* safely use an unwrapped SerializableCoder because
* {@link SerializableCoder#structuralValue(Serializable)} assumes that 
coded elements support
-   * object equality. By default, Coders compare equality by serialized bytes, 
which we want in
-   * this case. It is usually safe to depend on coded representation here 
because we only compare
-   * objects on bundle commit, which compares serializations of the same 
object instance.
+   * object equality. Sources in general do not support object equality. By 
default, Coders compare
 
 Review comment:
   Though with https://github.com/apache/beam/pull/4817 I'm no longer hugely 
concerned about the coder stuff. I wouldn't revert it, however - certainly, the 
`StructuralValue` is better, even if it's not going to be used anywhere 
relevant, and the comment seems sufficient for the time being.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80938)
Time Spent: 2h 20m  (was: 2h 10m)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2898) Flink supports chaining/fusion of single-SDK stages

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2898?focusedWorklogId=80925=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80925
 ]

ASF GitHub Bot logged work on BEAM-2898:


Author: ASF GitHub Bot
Created on: 15/Mar/18 18:51
Start Date: 15/Mar/18 18:51
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4783: [BEAM-2898] Support 
Impulse transforms in Flink batch runner
URL: https://github.com/apache/beam/pull/4783#issuecomment-373484841
 
 
   You'll need to rebase


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80925)
Time Spent: 2h 10m  (was: 2h)

> Flink supports chaining/fusion of single-SDK stages
> ---
>
> Key: BEAM-2898
> URL: https://issues.apache.org/jira/browse/BEAM-2898
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The Fn API supports fused stages, which avoids unnecessarily round-tripping 
> the data over the Fn API between stages. The Flink runner should use that 
> capability for better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)