[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179679
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 29/Dec/18 11:47
Start Date: 29/Dec/18 11:47
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7359: [BEAM-4681] Address 
synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359#issuecomment-450487164
 
 
   >Out of curiosity, how did you discover this issue? Might be worth to add a 
comment to the JIRA ticket.
   
   I found the issue while writing the test for #7362. The test solely uses 
timers in the first operator, without any state requests. The state requests 
would otherwise initialize the state backend key which is required for keying 
the pending timers map state. I got a "key not initalized" error. In cases with 
a key initialized by the state requests, this would have caused pending timers 
to be associated with the wrong key which would have led to problems deleting / 
resetting them.
   
   Adding this also to the JIRA.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179679)
Time Spent: 26h 10m  (was: 26h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 26h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179676=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179676
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 29/Dec/18 11:32
Start Date: 29/Dec/18 11:32
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #7359: [BEAM-4681] 
Address synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359#discussion_r244481038
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nonnull;
+
+/**
+ * A lock which can always be acquired. It should not be used when a proper 
lock is required, but it
+ * is useful as a performance optimization when locking is not necessary but 
the code paths have to
+ * be shared between the locking and the non-locking variant.
+ *
+ * For example, in {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator},
 the
+ * locking on the state backend is only required when both timers and state 
are used.
+ */
+public class NoopLock implements Lock, Serializable {
 
 Review comment:
   I thought about adding it to the SDK, but I wasn't sure how useful this 
would be for reuse. The use case is kind of a special optimization. I don't 
know how much performance it gives, as this depends on the number of state 
requests, but it also provides a hint how the locking granularity works for 
people reading the Runner code.
   
   There is a good chance someone who needs it would find it and move it to a 
different location.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179676)
Time Spent: 26h  (was: 25h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 26h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179609=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179609
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 29/Dec/18 04:04
Start Date: 29/Dec/18 04:04
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7359: [BEAM-4681] 
Address synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359#discussion_r29305
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nonnull;
+
+/**
+ * A lock which can always be acquired. It should not be used when a proper 
lock is required, but it
+ * is useful as a performance optimization when locking is not necessary but 
the code paths have to
+ * be shared between the locking and the non-locking variant.
+ *
+ * For example, in {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator},
 the
+ * locking on the state backend is only required when both timers and state 
are used.
+ */
+public class NoopLock implements Lock, Serializable {
 
 Review comment:
   Since this may be useful outside of the Flink runner, maybe move it to a 
common package like `org.apache.beam.sdk.util`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179609)
Time Spent: 25h 50m  (was: 25h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 25h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179532=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179532
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 28/Dec/18 18:48
Start Date: 28/Dec/18 18:48
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7359: [BEAM-4681] Address 
synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359#issuecomment-450408419
 
 
   No new test failures in `PortableValidatesRunner Streaming`. We need to 
address those separately: https://issues.apache.org/jira/browse/BEAM-6326
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179532)
Time Spent: 25h 40m  (was: 25.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 25h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179494
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 28/Dec/18 15:24
Start Date: 28/Dec/18 15:24
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7359: [BEAM-4681] Address 
synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359#issuecomment-450376249
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179494)
Time Spent: 25.5h  (was: 25h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 25.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-12-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179131
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 27/Dec/18 16:07
Start Date: 27/Dec/18 16:07
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #7359: [BEAM-4681] 
Address synchronization issue for portable timers
URL: https://github.com/apache/beam/pull/7359
 
 
   This fixes a regression of BEAM-867 where the key needs to be set on the 
state
   backend to support deleting pending timers on timer registration or firing. 
This
   can intefere with accessing user-defined state, but could also interfere 
when a
   timer is set at the same time that a timer fires.
   
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 179131)
Time Spent: 25h 20m  (was: 25h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 25h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167434
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 16:04
Start Date: 19/Nov/18 16:04
Worklog Time Spent: 10m 
  Work Description: mxm closed pull request #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index ff1d38b859f..0147085ab34 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -243,8 +243,6 @@ class BeamModulePlugin implements Plugin {
   excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
   excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
   excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-  // TODO Enable test once timer-support for batch is merged
-  excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
   //SplitableDoFnTests
   excludeCategories 
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
   excludeCategories 
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 2ed31b8ba48..3f26d5e2d99 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.flink;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap;
+import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
 import com.google.common.collect.BiMap;
@@ -55,7 +57,6 @@
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvKeySelector;
-import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
@@ -299,16 +300,13 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
 
   private static  void translateExecutableStage(
   PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
-// TODO: Fail on stateful DoFns for now.
-// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
 // TODO: Fail on splittable DoFns.
 // TODO: Special-case single outputs to avoid multiplexing PCollections.
 
 RunnerApi.Components components = pipeline.getComponents();
 Map outputs = transform.getTransform().getOutputsMap();
 // Mapping from PCollection id to coder tag id.
-BiMap outputMap =
-FlinkPipelineTranslatorUtils.createOutputMap(outputs.values());
+BiMap outputMap = createOutputMap(outputs.values());
 // Collect all output Coders and create a UnionCoder for our tagged 
outputs.
 List> unionCoders = Lists.newArrayList();
 // Enforce tuple tag sorting by union tag index.
@@ -338,21 +336,22 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
 }
 
 String inputPCollectionId = stagePayload.getInput();
+Coder> windowedInputCoder =
+instantiateCoder(inputPCollectionId, components);
+
 DataSet> inputDataSet = 
context.getDataSetOrThrow(inputPCollectionId);
 
-final boolean stateful = stagePayload.getUserStatesCount() > 0;
 final FlinkExecutableStageFunction function =
 new FlinkExecutableStageFunction<>(
 stagePayload,
 context.getJobInfo(),
 outputMap,
 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167422=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167422
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 15:46
Start Date: 19/Nov/18 15:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439938919
 
 
   Run Website PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167422)
Time Spent: 25h  (was: 24h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 25h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167406=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167406
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 14:37
Start Date: 19/Nov/18 14:37
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234641503
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,20 +330,92 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
 
 Review comment:
   The stage-by-stage is only in place if the `ExecutionMode` is set to 
`FORCE_BATCH`, instead of the default `PIPELINE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167406)
Time Spent: 24h 50m  (was: 24h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 24h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167405
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 14:33
Start Date: 19/Nov/18 14:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439912526
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167405)
Time Spent: 24h 40m  (was: 24.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 24h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167400
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 14:14
Start Date: 19/Nov/18 14:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234629349
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,20 +330,92 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
 
 Review comment:
   Yes, but I don't know if it is applicable for batch since it is scheduled 
stage by stage. For streaming the issue would surface just by running 
wordcount, for batch not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167400)
Time Spent: 24h 20m  (was: 24h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 24h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167398
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 14:13
Start Date: 19/Nov/18 14:13
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439905785
 
 
   Run Website PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167398)
Time Spent: 24h 10m  (was: 24h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 24h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167392
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 13:54
Start Date: 19/Nov/18 13:54
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439899958
 
 
   Run Website PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167392)
Time Spent: 24h  (was: 23h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 24h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167385
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 13:33
Start Date: 19/Nov/18 13:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439894046
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167385)
Time Spent: 23h 50m  (was: 23h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167384=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167384
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 13:33
Start Date: 19/Nov/18 13:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439893905
 
 
   Squashed for merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167384)
Time Spent: 23h 40m  (was: 23.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167359
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 11:50
Start Date: 19/Nov/18 11:50
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439867543
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167359)
Time Spent: 23.5h  (was: 23h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167358=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167358
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 11:50
Start Date: 19/Nov/18 11:50
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439867471
 
 
   Run Website PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167358)
Time Spent: 23h 20m  (was: 23h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167336
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 10:26
Start Date: 19/Nov/18 10:26
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234565020
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,20 +330,92 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
 
 Review comment:
   Good question. I think you are right. This can deadlock if the receiving 
side backpressures because it is waiting for input from another channel 
generated by this operator. Is that what you meant?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167336)
Time Spent: 23h 10m  (was: 23h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167333
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 10:08
Start Date: 19/Nov/18 10:08
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234558497
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -927,22 +941,53 @@ public void setTimer(
 /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
 @Deprecated
 @Override
-public void setTimer(TimerData timerKey) {
-  long time = timerKey.getTimestamp().getMillis();
-  switch (timerKey.getDomain()) {
+public void setTimer(TimerData timer) {
+  try {
+getKeyedStateBackend().setCurrentKey(getCurrentKey());
+String uniqueTimerId = getUniqueTimerId(timer);
 
 Review comment:
   It's because a new timer with timerId X resets the currently pending timer X 
in the same context. So we have to look up and remove the current timer and 
then set the new timer which involves adding it back to the "pending timers" 
map.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167333)
Time Spent: 23h  (was: 22h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 23h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167327=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167327
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 09:42
Start Date: 19/Nov/18 09:42
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234549296
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -918,6 +922,16 @@ public TimerInternals timerInternals() {
 
   class FlinkTimerInternals implements TimerInternals {
 
+/** Pending timers which are necessary for supporting removal of existing 
timers. */
 
 Review comment:
   I'll clarify. Basically timers which have not been fired yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167327)
Time Spent: 22h 50m  (was: 22h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 22h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167326=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167326
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 19/Nov/18 09:40
Start Date: 19/Nov/18 09:40
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234548677
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler(
   public void mapPartition(
   Iterable> iterable, Collector 
collector)
   throws Exception {
-processElements(iterable, collector);
+
+ReceiverFactory receiverFactory = new ReceiverFactory(collector, 
outputMap);
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+  processElements(iterable, bundle);
+}
   }
 
-  /** For stateful processing via a GroupReduceFunction. */
+  /** For stateful and timer processing via a GroupReduceFunction. */
   @Override
   public void reduce(Iterable> iterable, 
Collector collector)
   throws Exception {
-bagUserStateHandlerFactory.resetForNewKey();
-processElements(iterable, collector);
+
+// Need to discard the old key's state
+if (bagUserStateHandlerFactory != null) {
+  bagUserStateHandlerFactory.resetForNewKey();
+}
+
+// Used with Batch, we know that all the data is available for this key. 
We can't use the
+// timer manager from the context because it doesn't exist. So we create 
one and advance
+// time to the end after processing all elements.
+final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+timerInternals.advanceProcessingTime(Instant.now());
+timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+ReceiverFactory receiverFactory =
+new ReceiverFactory(
+collector,
+outputMap,
+new TimerReceiverFactory(
+stageBundleFactory,
+executableStage.getTimers(),
+
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(),
+(WindowedValue timerElement, TimerInternals.TimerData 
timerData) -> {
+  currentTimerKey = (((KV) timerElement.getValue()).getKey());
+  timerInternals.setTimer(timerData);
+},
+windowCoder));
+
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+  processElements(iterable, bundle);
+}
+
+// Finish any pending windows by advancing the input watermark to infinity.
+timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+// Finally, advance the processing time to infinity to fire any timers.
+timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+
+  fireEligibleTimers(
+  timerInternals,
+  (String timerId, WindowedValue timerValue) -> {
+FnDataReceiver> fnTimerReceiver =
+bundle.getInputReceivers().get(timerId);
+Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver 
found for %s", timerId);
+try {
+  fnTimerReceiver.accept(timerValue);
+} catch (Exception e) {
+  throw new RuntimeException(
+  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
+}
+  });
+}
   }
 
-  private void processElements(
-  Iterable> iterable, Collector 
collector)
+  private void processElements(Iterable> iterable, 
RemoteBundle bundle)
   throws Exception {
-checkState(
-runtimeContext == getRuntimeContext(),
-"RuntimeContext changed from under us. State handler invalid.");
-checkState(
-stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
-checkState(
-stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
+Preconditions.checkArgument(bundle != null, "RemoteBundle must not be 
null");
+
+String inputPCollectionId = executableStage.getInputPCollection().getId();
+FnDataReceiver> mainReceiver =
+Preconditions.checkNotNull(
+bundle.getInputReceivers().get(inputPCollectionId),
+"Main 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167268
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234464322
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -918,6 +922,16 @@ public TimerInternals timerInternals() {
 
   class FlinkTimerInternals implements TimerInternals {
 
+/** Pending timers which are necessary for supporting removal of existing 
timers. */
+private final MapState pendingTimersById;
+
+private FlinkTimerInternals() {
+  MapStateDescriptor pendingTimersByIdStateDescriptor =
+  new MapStateDescriptor<>(
+  "timer-dedup", new StringSerializer(), new 
CoderTypeSerializer<>(timerCoder));
 
 Review comment:
   "timer-dedup" => "pending-timers" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167268)
Time Spent: 22h 10m  (was: 22h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 22h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167271
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234464396
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -927,22 +941,53 @@ public void setTimer(
 /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
 @Deprecated
 @Override
-public void setTimer(TimerData timerKey) {
-  long time = timerKey.getTimestamp().getMillis();
-  switch (timerKey.getDomain()) {
+public void setTimer(TimerData timer) {
+  try {
+getKeyedStateBackend().setCurrentKey(getCurrentKey());
+String uniqueTimerId = getUniqueTimerId(timer);
 
 Review comment:
   The logic here also needs explanation. We first remove the pending timer but 
then it is being put back in `registerTimer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167271)
Time Spent: 22.5h  (was: 22h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 22.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167267
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234463725
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler(
   public void mapPartition(
   Iterable> iterable, Collector 
collector)
   throws Exception {
-processElements(iterable, collector);
+
+ReceiverFactory receiverFactory = new ReceiverFactory(collector, 
outputMap);
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+  processElements(iterable, bundle);
+}
   }
 
-  /** For stateful processing via a GroupReduceFunction. */
+  /** For stateful and timer processing via a GroupReduceFunction. */
   @Override
   public void reduce(Iterable> iterable, 
Collector collector)
   throws Exception {
-bagUserStateHandlerFactory.resetForNewKey();
-processElements(iterable, collector);
+
+// Need to discard the old key's state
+if (bagUserStateHandlerFactory != null) {
+  bagUserStateHandlerFactory.resetForNewKey();
+}
+
+// Used with Batch, we know that all the data is available for this key. 
We can't use the
+// timer manager from the context because it doesn't exist. So we create 
one and advance
+// time to the end after processing all elements.
+final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+timerInternals.advanceProcessingTime(Instant.now());
+timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+ReceiverFactory receiverFactory =
+new ReceiverFactory(
+collector,
+outputMap,
+new TimerReceiverFactory(
+stageBundleFactory,
+executableStage.getTimers(),
+
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(),
+(WindowedValue timerElement, TimerInternals.TimerData 
timerData) -> {
+  currentTimerKey = (((KV) timerElement.getValue()).getKey());
+  timerInternals.setTimer(timerData);
+},
+windowCoder));
+
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+  processElements(iterable, bundle);
+}
+
+// Finish any pending windows by advancing the input watermark to infinity.
+timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+// Finally, advance the processing time to infinity to fire any timers.
+timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+try (RemoteBundle bundle =
+stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+
+  fireEligibleTimers(
+  timerInternals,
+  (String timerId, WindowedValue timerValue) -> {
+FnDataReceiver> fnTimerReceiver =
+bundle.getInputReceivers().get(timerId);
+Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver 
found for %s", timerId);
+try {
+  fnTimerReceiver.accept(timerValue);
+} catch (Exception e) {
+  throw new RuntimeException(
+  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
+}
+  });
+}
   }
 
-  private void processElements(
-  Iterable> iterable, Collector 
collector)
+  private void processElements(Iterable> iterable, 
RemoteBundle bundle)
   throws Exception {
-checkState(
-runtimeContext == getRuntimeContext(),
-"RuntimeContext changed from under us. State handler invalid.");
-checkState(
-stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
-checkState(
-stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
+Preconditions.checkArgument(bundle != null, "RemoteBundle must not be 
null");
+
+String inputPCollectionId = executableStage.getInputPCollection().getId();
+FnDataReceiver> mainReceiver =
+Preconditions.checkNotNull(
+bundle.getInputReceivers().get(inputPCollectionId),
+"Main 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167269
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234463917
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,20 +330,92 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
 
 Review comment:
   In streaming doing this in the RPC handler can cause a deadlock when collect 
triggers the next bundle in an operator chain. Presumably that cannot happen 
due to different scheduling in batch?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167269)
Time Spent: 22h 10m  (was: 22h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 22h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167270
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234464176
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,20 +330,92 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
+  }
+};
+  } else if (timerReceiverFactory != null) {
+// Delegate to TimerReceiverFactory
+return timerReceiverFactory.create(collectionId);
+  } else {
+throw new IllegalStateException(
+String.format(Locale.ENGLISH, "Unknown PCollectionId %s", 
collectionId));
+  }
+}
+  }
+
+  private static class TimerReceiverFactory implements OutputReceiverFactory {
+
+private final StageBundleFactory stageBundleFactory;
+/** Timer PCollection id => TimerReference. */
+private final HashMap 
timerOutputIdToSpecMap;
+/** Timer PCollection id => timer name => TimerSpec. */
+private final Map> 
timerSpecMap;
+
+private final BiConsumer 
timerDataConsumer;
+private final Coder windowCoder;
+
+TimerReceiverFactory(
+StageBundleFactory stageBundleFactory,
+Collection timerReferenceCollection,
+Map> 
timerSpecMap,
+BiConsumer timerDataConsumer,
+Coder windowCoder) {
+  this.stageBundleFactory = stageBundleFactory;
+  this.timerOutputIdToSpecMap = new HashMap<>();
+  // Gather all timers from all transforms by their output pCollectionId 
which is unique
+  for (Map transformTimerMap :
+  
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
+for (ProcessBundleDescriptors.TimerSpec timerSpec : 
transformTimerMap.values()) {
+  timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), 
timerSpec);
+}
+  }
+  this.timerSpecMap = timerSpecMap;
+  this.timerDataConsumer = timerDataConsumer;
+  this.windowCoder = windowCoder;
+}
+
+@Override
+public  FnDataReceiver create(String pCollectionId) {
+  final ProcessBundleDescriptors.TimerSpec timerSpec =
+  timerOutputIdToSpecMap.get(pCollectionId);
+
   return receivedElement -> {
-synchronized (collectorLock) {
-  collector.collect(new RawUnionValue(tagInt, receivedElement));
+WindowedValue windowedValue = (WindowedValue) receivedElement;
+Timer timer =
+Preconditions.checkNotNull(
+(Timer) ((KV) windowedValue.getValue()).getValue(),
+"Received null Timer from SDK harness: %s",
+receivedElement);
+LOG.info("Timer received: {} {}", pCollectionId, timer);
 
 Review comment:
   debug


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167270)
Time Spent: 22h 20m  (was: 22h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167272
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 18/Nov/18 22:13
Start Date: 18/Nov/18 22:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234464294
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -918,6 +922,16 @@ public TimerInternals timerInternals() {
 
   class FlinkTimerInternals implements TimerInternals {
 
+/** Pending timers which are necessary for supporting removal of existing 
timers. */
 
 Review comment:
   What are "pending timers"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167272)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 22.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166944
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 16:39
Start Date: 16/Nov/18 16:39
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439451923
 
 
   Same goes for "Run JavaPortabilityApi PreCommit" which fails continuously at 
the moment for all commits. My PR doesn't change anything regarding the 
portability API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166944)
Time Spent: 21h 50m  (was: 21h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166943
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 16:38
Start Date: 16/Nov/18 16:38
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439451545
 
 
   Unfortunately, the Jenkins setup for it seems to be broken at the moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166943)
Time Spent: 21h 40m  (was: 21.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166940
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 16:38
Start Date: 16/Nov/18 16:38
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439451306
 
 
   Ran "Java Flink PortableValidatesRunner" locally with the latest master. All 
tests passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166940)
Time Spent: 21.5h  (was: 21h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166882
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:14
Start Date: 16/Nov/18 14:14
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439405481
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166882)
Time Spent: 21h  (was: 20h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166881=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166881
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:14
Start Date: 16/Nov/18 14:14
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439405438
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166881)
Time Spent: 20h 50m  (was: 20h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166892
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:20
Start Date: 16/Nov/18 14:20
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439407400
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166892)
Time Spent: 21h 20m  (was: 21h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166891=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166891
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:20
Start Date: 16/Nov/18 14:20
Worklog Time Spent: 10m 
  Work Description: mxm removed a comment on issue #7008: [BEAM-4681] Add 
support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439405481
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166891)
Time Spent: 21h 10m  (was: 21h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 21h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166876
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:05
Start Date: 16/Nov/18 14:05
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439402850
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166876)
Time Spent: 20h 40m  (was: 20.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166874
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:02
Start Date: 16/Nov/18 14:02
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439401903
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166874)
Time Spent: 20.5h  (was: 20h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166873
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 14:02
Start Date: 16/Nov/18 14:02
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439401869
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166873)
Time Spent: 20h 20m  (was: 20h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166851
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 12:06
Start Date: 16/Nov/18 12:06
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439373857
 
 
   "Run Java Flink PortableValidatesRunner" seems to be broken due to Docker 
container issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166851)
Time Spent: 20h 10m  (was: 20h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166848=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166848
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 12:04
Start Date: 16/Nov/18 12:04
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439373448
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166848)
Time Spent: 19h 40m  (was: 19.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 19h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166849=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166849
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 12:04
Start Date: 16/Nov/18 12:04
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439373531
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166849)
Time Spent: 19h 50m  (was: 19h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166850=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166850
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 12:04
Start Date: 16/Nov/18 12:04
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439373552
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166850)
Time Spent: 20h  (was: 19h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 20h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166841
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 11:09
Start Date: 16/Nov/18 11:09
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439361428
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166841)
Time Spent: 19.5h  (was: 19h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.9.0
>
>  Time Spent: 19.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166838=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166838
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 10:35
Start Date: 16/Nov/18 10:35
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439352585
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166838)
Time Spent: 19h 20m  (was: 19h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 19h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166830=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166830
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 16/Nov/18 10:18
Start Date: 16/Nov/18 10:18
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439347844
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166830)
Time Spent: 19h  (was: 18h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 19h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166629
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 15/Nov/18 20:06
Start Date: 15/Nov/18 20:06
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439172785
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166629)
Time Spent: 18h 40m  (was: 18.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 18h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166630
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 15/Nov/18 20:06
Start Date: 15/Nov/18 20:06
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-439172809
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 166630)
Time Spent: 18h 50m  (was: 18h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 18h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166016=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166016
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 18:52
Start Date: 14/Nov/18 18:52
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bd83ad1ea47..ff1d38b859f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -243,6 +243,7 @@ class BeamModulePlugin implements Plugin {
   excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
   excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
   excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+  // TODO Enable test once timer-support for batch is merged
   excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
   //SplitableDoFnTests
   excludeCategories 
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
@@ -1471,6 +1472,9 @@ artifactId=${project.name}
 
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
 "--jobServerDriver=${config.jobServerDriver}",
 "--environmentCacheMillis=1",
+// TODO Create two tasks to run for both batch and streaming:
+// https://issues.apache.org/jira/browse/BEAM-6009
+// "--streaming"
   ]
   if (config.jobServerConfig) {
 
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index bf25dc65013..a8adc97edcf 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -562,8 +563,7 @@ private void translateStreamingImpulse(
 
 Coder keyCoder = null;
 KeySelector, ?> keySelector = null;
-final boolean stateful = stagePayload.getUserStatesCount() > 0;
-if (stateful) {
+if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() 
> 0) {
   // Stateful stages are only allowed of KV input
   Coder valueCoder =
   ((WindowedValue.FullWindowedValueCoder) 
windowedInputCoder).getValueCoder();
@@ -601,6 +601,7 @@ private void translateStreamingImpulse(
 context.getJobInfo(),
 FlinkExecutableStageContext.factory(context.getPipelineOptions()),
 collectionIdToTupleTag,
+getWindowingStrategy(inputPCollectionId, components),
 keyCoder,
 keySelector);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
index 1710a27da7c..2ea3b9323d3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
@@ -22,10 +22,14 @@
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165978
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 17:45
Start Date: 14/Nov/18 17:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233550751
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   Neat. With your fix I was able to build a `timerOutputIdToSpecMap` from 
output collection id to TimerSpec. This also got rid of some boiler plate code 
which I previously used to extract the TimerSpec.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165978)
Time Spent: 18h  (was: 17h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 18h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165980
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 17:45
Start Date: 14/Nov/18 17:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438753473
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165980)
Time Spent: 18h 20m  (was: 18h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 18h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165979
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 17:45
Start Date: 14/Nov/18 17:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438753446
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165979)
Time Spent: 18h 10m  (was: 18h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 18h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165953
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 16:32
Start Date: 14/Nov/18 16:32
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233522450
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   https://github.com/mxm/beam/pull/1 This also avoids hacks to guess the 
output name from the input name. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165953)
Time Spent: 17h 50m  (was: 17h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 17h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165945
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 16:17
Start Date: 14/Nov/18 16:17
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438721581
 
 
   PVR are extremely slow because the Docker environment takes long to come up 
for every test:
   
   ```
   INFO 
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - 
Still waiting for startup of environment 
jenkins-docker-apache.bintray.io/beam/java for worker id 1
   ```
   
   Eventually the 100 minute timeout kills the job.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165945)
Time Spent: 17h 20m  (was: 17h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165948
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 16:28
Start Date: 14/Nov/18 16:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233520762
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   OK, the problem is that there are multiple copies of components and 
pcollections floating around...
   
   The right way to solve this is to patch in 
https://github.com/mxm/beam/pull/1/commits/55baf15befea582a3fc7796f9c0a4a031c3485b3
 and then use stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs() 
in SdkHarnessDoFnRunner to construct timerReferenceMap or whatever else is 
needed, which has the full information. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165948)
Time Spent: 17.5h  (was: 17h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 17.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165904
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 14:50
Start Date: 14/Nov/18 14:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438687972
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165904)
Time Spent: 17h 10m  (was: 17h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165901
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 14:36
Start Date: 14/Nov/18 14:36
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233471781
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   I think it will be best we merge this PR and revert #6986.  BEAM-5999 can 
then rebase and address this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165901)
Time Spent: 17h  (was: 16h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 17h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165884=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165884
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 13:55
Start Date: 14/Nov/18 13:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233443898
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +533,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
+setTimer(windowedValue, timerData);
+  }
+}
+  }
+}
+
+private String extractTimerPCollectionId(String outputPCollectionId) {
+  return stageBundleFactory
+  .getProcessBundleDescriptor()
+  .getProcessBundleDescriptor()
+  .getPcollectionsMap()
+  .get(outputPCollectionId)
+  .getUniqueName();
 
 Review comment:
   It is still necessary. The timerReferenceMap gives me `ParDo.timer.foo` but 
while processing the timer elements, I get `ParDo.timer.foo.out:0`. The issue 
seems to be that the output collection ids are not set correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165884)
Time Spent: 16h 40m  (was: 16.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 16h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165886
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 13:58
Start Date: 14/Nov/18 13:58
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233456585
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +533,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
+setTimer(windowedValue, timerData);
+  }
+}
+  }
+}
+
+private String extractTimerPCollectionId(String outputPCollectionId) {
+  return stageBundleFactory
+  .getProcessBundleDescriptor()
+  .getProcessBundleDescriptor()
+  .getPcollectionsMap()
+  .get(outputPCollectionId)
+  .getUniqueName();
 
 Review comment:
   >Unless I'm mistaken, extractTimerPCollectionId should not be needed at all 
anymore, as timerReferenceMap is indexed by outputPCollectionId itself.
   
   It is still necessary. The timerReferenceMap gives me `ParDo.timer.foo` for 
the output name of timer "foo", when it is `ParDo.timer.foo.out:0` at runtime.  
I believe there lies the problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165886)
Time Spent: 16h 50m  (was: 16h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 16h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165883=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165883
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 13:55
Start Date: 14/Nov/18 13:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233455289
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   >Timers are absolutely determined by their transform name and local name; 
probably it makes sense to use that
   
   That's what is used here. The keys of `timerReferenceMap` are transform name 
+ local name. The "extractTimerPCollectionId" method does the mapping of output 
collection to timer name.
   
   >and then also have a mapping fullyQualifiedTimerName -> 
timerInputPCollectionId for plumbing timers to the SDK, and a map 
timerOutputPCollectionId -> fullyQualifiedTimerName for recording timers from 
the SDK (like we do here).
   
   I couldn't find a nice way to create a map timerOutputPCollectionId -> 
fullyQualifiedTimerName.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165883)
Time Spent: 16.5h  (was: 16h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 16.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165873
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 12:29
Start Date: 14/Nov/18 12:29
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233426740
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +534,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
 
 Review comment:
   The notion "timerCollectionId" is ambiguous, as timers are associated with 
two distinct PCollections at execution time. 
   
   Timers are absolutely determined by their transform name and local name; 
probably it makes sense to use that (let's call it fullyQualifiedTimerName) for 
the TimerData here, and then also have a mapping fullyQualifiedTimerName -> 
timerInputPCollectionId for plumbing timers to the SDK, and a map 
timerOutputPCollectionId -> fullyQualifiedTimerName for recording timers from 
the SDK (like we do here). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165873)
Time Spent: 16h 10m  (was: 16h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 16h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165853=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165853
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 10:16
Start Date: 14/Nov/18 10:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438610754
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165853)
Time Spent: 15h 50m  (was: 15h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 15h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165852
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 14/Nov/18 10:16
Start Date: 14/Nov/18 10:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438610701
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165852)
Time Spent: 15h 40m  (was: 15.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 15h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165677=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165677
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 22:49
Start Date: 13/Nov/18 22:49
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438468348
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165677)
Time Spent: 15h 20m  (was: 15h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165672=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165672
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 22:28
Start Date: 13/Nov/18 22:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438462766
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165672)
Time Spent: 15h 10m  (was: 15h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 15h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165645
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 21:19
Start Date: 13/Nov/18 21:19
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233225330
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +533,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
+setTimer(windowedValue, timerData);
+  }
+}
+  }
+}
+
+private String extractTimerPCollectionId(String outputPCollectionId) {
+  return stageBundleFactory
+  .getProcessBundleDescriptor()
+  .getProcessBundleDescriptor()
+  .getPcollectionsMap()
+  .get(outputPCollectionId)
+  .getUniqueName();
 
 Review comment:
   For example, I have `ParDo.timer.foo.out:0` which resolves to 
`ParDo.timer.output` instead of `ParDo.timer.foo`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165645)
Time Spent: 15h  (was: 14h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 15h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165644
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 21:16
Start Date: 13/Nov/18 21:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438439729
 
 
   Run Website PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165644)
Time Spent: 14h 50m  (was: 14h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165643=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165643
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 21:16
Start Date: 13/Nov/18 21:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438439587
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165643)
Time Spent: 14h 40m  (was: 14.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165606=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165606
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 19:41
Start Date: 13/Nov/18 19:41
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233194809
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,13 +533,100 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+/** Key for timer which has not been registered yet. */
+Object getTimerKeyForRegistration() {
+  return keyForTimerToBeSet;
+}
+
+/** Key for timer which is about to be fired. */
+void setTimerKeyForFire(Object key) {
+  this.keyForTimerToBeFired = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String outputPCollectionId = 
Preconditions.checkNotNull(result.getKey());
+TupleTag tag = outputMap.get(outputPCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+outputPCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  final String timerCollectionId = 
extractTimerPCollectionId(outputPCollectionId);
+  TimerSpec timerSpec = extractTimerSpec(timerCollectionId);
+  Timer timer =
+  Preconditions.checkNotNull(
+  (Timer) ((KV) windowedValue.getValue()).getValue(),
+  "Received null Timer from SDK harness: %s",
+  windowedValue);
+  LOG.debug("Timer received: {} {}", outputPCollectionId, timer);
+  for (Object window : windowedValue.getWindows()) {
+StateNamespace namespace = StateNamespaces.window(windowCoder, 
(BoundedWindow) window);
+TimerInternals.TimerData timerData =
+TimerInternals.TimerData.of(
+timerCollectionId, namespace, timer.getTimestamp(), 
timerSpec.getTimeDomain());
+setTimer(windowedValue, timerData);
+  }
+}
+  }
+}
+
+private String extractTimerPCollectionId(String outputPCollectionId) {
+  return stageBundleFactory
+  .getProcessBundleDescriptor()
+  .getProcessBundleDescriptor()
+  .getPcollectionsMap()
+  .get(outputPCollectionId)
+  .getUniqueName();
 
 Review comment:
   The name here is somehow not stable anymore after #6986.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165606)
Time Spent: 14h 10m  (was: 14h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165566=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165566
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:33
Start Date: 13/Nov/18 18:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438385710
 
 
   Looks like a Jenkins problem, looking into it.
   
   ```
   18:55:59 Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
os::commit_memory(0x00019a30, 1915748352, 0) failed; error='Cannot 
allocate memory' (errno=12)
   18:55:59 #
   18:55:59 # There is insufficient memory for the Java Runtime Environment to 
continue.
   18:55:59 # Native memory allocation (mmap) failed to map 1915748352 bytes 
for committing reserved memory.```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165566)
Time Spent: 13.5h  (was: 13h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165576=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165576
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:44
Start Date: 13/Nov/18 18:44
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438389218
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165576)
Time Spent: 14h  (was: 13h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 14h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165575
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:43
Start Date: 13/Nov/18 18:43
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438389186
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165575)
Time Spent: 13h 50m  (was: 13h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 13h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165567=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165567
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:33
Start Date: 13/Nov/18 18:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438385817
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165567)
Time Spent: 13h 40m  (was: 13.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 13h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165556
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:09
Start Date: 13/Nov/18 18:09
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233161421
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -304,16 +342,141 @@ protected void 
addSideInputValue(StreamRecord streamRecord) {
   @Override
   protected DoFnRunner createWrappingDoFnRunner(
   DoFnRunner wrappedRunner) {
-return new SdkHarnessDoFnRunner();
+sdkHarnessRunner =
+new SdkHarnessDoFnRunner<>(
+executableStage.getInputPCollection().getId(),
+stageBundleFactory,
+stateRequestHandler,
+progressHandler,
+outputManager,
+outputMap,
+executableStage.getTimers(),
+(Coder) 
windowingStrategy.getWindowFn().windowCoder(),
+(WindowedValue key, TimerInternals.TimerData timerData) -> 
{
+  try {
+synchronized (getKeyedStateBackend()) {
+  setCurrentKey(keySelector.getKey(key));
+  timerInternals.setTimer(timerData);
+}
+  } catch (Exception e) {
+throw new RuntimeException("Couldn't set key", e);
+  }
+},
+() -> {
+  synchronized (getKeyedStateBackend()) {
+ByteBuffer encodedKey = (ByteBuffer) 
getKeyedStateBackend().getCurrentKey();
+@SuppressWarnings("ByteBufferBackingArray")
+ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+try {
+  return keyCoder.decode(byteStream);
+} catch (IOException e) {
+  throw new RuntimeException(
+  String.format(
+  Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
+}
+  }
+});
+return sdkHarnessRunner;
   }
 
-  private class SdkHarnessDoFnRunner implements DoFnRunner {
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+// Due to the asynchronous communication with the SDK harness,
+// a bundle might still be in progress and not all items have
+// yet been received from the SDk harness. If we just set this
+// watermark as the new output watermark, we could violate the
+// order of the records, i.e. pending items in the SDK harness
+// could become "late" although they were "on time".
+//
+// We can solve this problem using one of the following options:
+//
+// 1) Finish the current bundle and emit this watermark as the
+//new output watermark. Finishing the bundle ensures that
+//all the items have been processed by the SDK harness and
+//received by the outputQueue (see below), where they will
+//have been emitted to the output stream.
+//
+// 2) Put a hold on the output watermark for as long as the current
+//bundle has not been finished. We have to remember to manually
+//finish the bundle in case we receive the final watermark.
+//To avoid latency, we should process this watermark again as
+//soon as the current bundle is finished.
+//
+// Approach 1) is the easiest, yet 2) gives better throughput due
+// to the bundle getting cut on every watermark. So we have
+// implemented 2) below.
+//
+if (sdkHarnessRunner.isBundleInProgress()) {
+  if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+invokeFinishBundle();
+  } else {
+// It is not safe to advance the output watermark yet, so add a hold 
on the current
+// output watermark.
+setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+sdkHarnessRunner.setBundleFinishedCallback(
+() -> {
+  try {
+processWatermark(mark);
+  } catch (Exception e) {
+throw new RuntimeException(
+"Failed to process pushed back watermark after finished 
bundle.", e);
+  }
+});
+  }
+}
+super.processWatermark(mark);
+  }
+
+  private static class SdkHarnessDoFnRunner
 
 Review comment:
   Regarding the operator as a whole and as discussed in the past, constraints 
are mostly inherited and would need to be 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165554=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165554
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 18:05
Start Date: 13/Nov/18 18:05
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r233159888
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -304,16 +342,141 @@ protected void 
addSideInputValue(StreamRecord streamRecord) {
   @Override
   protected DoFnRunner createWrappingDoFnRunner(
   DoFnRunner wrappedRunner) {
-return new SdkHarnessDoFnRunner();
+sdkHarnessRunner =
+new SdkHarnessDoFnRunner<>(
+executableStage.getInputPCollection().getId(),
+stageBundleFactory,
+stateRequestHandler,
+progressHandler,
+outputManager,
+outputMap,
+executableStage.getTimers(),
+(Coder) 
windowingStrategy.getWindowFn().windowCoder(),
+(WindowedValue key, TimerInternals.TimerData timerData) -> 
{
+  try {
+synchronized (getKeyedStateBackend()) {
+  setCurrentKey(keySelector.getKey(key));
+  timerInternals.setTimer(timerData);
+}
+  } catch (Exception e) {
+throw new RuntimeException("Couldn't set key", e);
+  }
+},
+() -> {
+  synchronized (getKeyedStateBackend()) {
+ByteBuffer encodedKey = (ByteBuffer) 
getKeyedStateBackend().getCurrentKey();
+@SuppressWarnings("ByteBufferBackingArray")
+ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+try {
+  return keyCoder.decode(byteStream);
+} catch (IOException e) {
+  throw new RuntimeException(
+  String.format(
+  Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
+}
+  }
+});
+return sdkHarnessRunner;
   }
 
-  private class SdkHarnessDoFnRunner implements DoFnRunner {
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+// Due to the asynchronous communication with the SDK harness,
+// a bundle might still be in progress and not all items have
+// yet been received from the SDk harness. If we just set this
+// watermark as the new output watermark, we could violate the
+// order of the records, i.e. pending items in the SDK harness
+// could become "late" although they were "on time".
+//
+// We can solve this problem using one of the following options:
+//
+// 1) Finish the current bundle and emit this watermark as the
+//new output watermark. Finishing the bundle ensures that
+//all the items have been processed by the SDK harness and
+//received by the outputQueue (see below), where they will
+//have been emitted to the output stream.
+//
+// 2) Put a hold on the output watermark for as long as the current
+//bundle has not been finished. We have to remember to manually
+//finish the bundle in case we receive the final watermark.
+//To avoid latency, we should process this watermark again as
+//soon as the current bundle is finished.
+//
+// Approach 1) is the easiest, yet 2) gives better throughput due
+// to the bundle getting cut on every watermark. So we have
+// implemented 2) below.
+//
+if (sdkHarnessRunner.isBundleInProgress()) {
+  if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+invokeFinishBundle();
+  } else {
+// It is not safe to advance the output watermark yet, so add a hold 
on the current
+// output watermark.
+setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+sdkHarnessRunner.setBundleFinishedCallback(
+() -> {
+  try {
+processWatermark(mark);
+  } catch (Exception e) {
+throw new RuntimeException(
+"Failed to process pushed back watermark after finished 
bundle.", e);
+  }
+});
+  }
+}
+super.processWatermark(mark);
+  }
+
+  private static class SdkHarnessDoFnRunner
 
 Review comment:
   I would agree with that if `SdkHarnessDoFnRunner` was a component with a 
prospect of being used independently. Instead it is 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165526=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165526
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 17:54
Start Date: 13/Nov/18 17:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438371397
 
 
   Run Python Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165526)
Time Spent: 13h  (was: 12h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 13h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165525=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165525
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 17:50
Start Date: 13/Nov/18 17:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438370184
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165525)
Time Spent: 12h 50m  (was: 12h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 12h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165511
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 17:32
Start Date: 13/Nov/18 17:32
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438363264
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165511)
Time Spent: 12h 20m  (was: 12h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165514
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 17:34
Start Date: 13/Nov/18 17:34
Worklog Time Spent: 10m 
  Work Description: mxm removed a comment on issue #7008: [BEAM-4681] Add 
support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438363264
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165514)
Time Spent: 12h 40m  (was: 12.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 12h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165513
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 17:34
Start Date: 13/Nov/18 17:34
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438364394
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165513)
Time Spent: 12.5h  (was: 12h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165422=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165422
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 13:15
Start Date: 13/Nov/18 13:15
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438261605
 
 
   Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165422)
Time Spent: 12h  (was: 11h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165421=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165421
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 13:15
Start Date: 13/Nov/18 13:15
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6981: [BEAM-4681] Add support 
for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438261549
 
 
   I've addressed your comments @tweise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165421)
Time Spent: 11h 50m  (was: 11h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165420=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165420
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 13:14
Start Date: 13/Nov/18 13:14
Worklog Time Spent: 10m 
  Work Description: mxm removed a comment on issue #6981: [BEAM-4681] Add 
support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#issuecomment-438261209
 
 
Run Java Flink PortableValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165420)
Time Spent: 11h 40m  (was: 11.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165375=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165375
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 11:27
Start Date: 13/Nov/18 11:27
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438232854
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165375)
Time Spent: 11h  (was: 10h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165376=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165376
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 11:27
Start Date: 13/Nov/18 11:27
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438232874
 
 
   Run Java_Examples_Dataflow PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165376)
Time Spent: 11h 10m  (was: 11h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165374=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165374
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 11:25
Start Date: 13/Nov/18 11:25
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232998285
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,16 +534,86 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+void setTimerKey(Object key) {
+  this.beforeFireTimerKey = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String inputCollectionId = result.getKey();
+TupleTag tag = outputMap.get(inputCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+inputCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  // process timer elements
+  // TODO This is ugly. There should be an easier way to retrieve the
+  String timerPCollectionId =
+  inputCollectionId.substring(0, inputCollectionId.length() - 
".out:0".length());
+  TimerReference timerReference = 
timerReferenceMap.get(timerPCollectionId);
+  if (timerReference != null) {
+Timer timer =
+Preconditions.checkNotNull(
+(Timer) ((KV) windowedValue.getValue()).getValue(),
+"Received null Timer from SDK harness: %s",
+windowedValue);
+LOG.debug("Timer received: {} {}", inputCollectionId, timer);
+for (Object window : windowedValue.getWindows()) {
+  StateNamespace namespace =
+  StateNamespaces.window(windowCoder, (BoundedWindow) window);
+  TimerSpec timerSpec = extractTimerSpec(timerReference);
+  TimerInternals.TimerData timerData =
+  TimerInternals.TimerData.of(
+  timerPCollectionId,
+  namespace,
+  timer.getTimestamp(),
+  timerSpec.getTimeDomain());
+  timerDataConsumer.accept(windowedValue, timerData);
 
 Review comment:
   Pushed a commit which removes the BiConsumer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165374)
Time Spent: 10h 50m  (was: 10h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165363
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 11:01
Start Date: 13/Nov/18 11:01
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232990625
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -304,16 +342,141 @@ protected void 
addSideInputValue(StreamRecord streamRecord) {
   @Override
   protected DoFnRunner createWrappingDoFnRunner(
   DoFnRunner wrappedRunner) {
-return new SdkHarnessDoFnRunner();
+sdkHarnessRunner =
+new SdkHarnessDoFnRunner<>(
+executableStage.getInputPCollection().getId(),
+stageBundleFactory,
+stateRequestHandler,
+progressHandler,
+outputManager,
+outputMap,
+executableStage.getTimers(),
+(Coder) 
windowingStrategy.getWindowFn().windowCoder(),
+(WindowedValue key, TimerInternals.TimerData timerData) -> 
{
+  try {
+synchronized (getKeyedStateBackend()) {
+  setCurrentKey(keySelector.getKey(key));
+  timerInternals.setTimer(timerData);
+}
+  } catch (Exception e) {
+throw new RuntimeException("Couldn't set key", e);
+  }
+},
+() -> {
+  synchronized (getKeyedStateBackend()) {
+ByteBuffer encodedKey = (ByteBuffer) 
getKeyedStateBackend().getCurrentKey();
+@SuppressWarnings("ByteBufferBackingArray")
+ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+try {
+  return keyCoder.decode(byteStream);
+} catch (IOException e) {
+  throw new RuntimeException(
+  String.format(
+  Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
+}
+  }
+});
+return sdkHarnessRunner;
   }
 
-  private class SdkHarnessDoFnRunner implements DoFnRunner {
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+// Due to the asynchronous communication with the SDK harness,
+// a bundle might still be in progress and not all items have
+// yet been received from the SDk harness. If we just set this
+// watermark as the new output watermark, we could violate the
+// order of the records, i.e. pending items in the SDK harness
+// could become "late" although they were "on time".
+//
+// We can solve this problem using one of the following options:
+//
+// 1) Finish the current bundle and emit this watermark as the
+//new output watermark. Finishing the bundle ensures that
+//all the items have been processed by the SDK harness and
+//received by the outputQueue (see below), where they will
+//have been emitted to the output stream.
+//
+// 2) Put a hold on the output watermark for as long as the current
+//bundle has not been finished. We have to remember to manually
+//finish the bundle in case we receive the final watermark.
+//To avoid latency, we should process this watermark again as
+//soon as the current bundle is finished.
+//
+// Approach 1) is the easiest, yet 2) gives better throughput due
+// to the bundle getting cut on every watermark. So we have
+// implemented 2) below.
+//
+if (sdkHarnessRunner.isBundleInProgress()) {
+  if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+invokeFinishBundle();
+  } else {
+// It is not safe to advance the output watermark yet, so add a hold 
on the current
+// output watermark.
+setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+sdkHarnessRunner.setBundleFinishedCallback(
+() -> {
+  try {
+processWatermark(mark);
+  } catch (Exception e) {
+throw new RuntimeException(
+"Failed to process pushed back watermark after finished 
bundle.", e);
+  }
+});
+  }
+}
+super.processWatermark(mark);
+  }
+
+  private static class SdkHarnessDoFnRunner
 
 Review comment:
   We could move this out of the operator now which would increase readability. 
If we don't split it into a separate static class, 

[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165357=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165357
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:39
Start Date: 13/Nov/18 10:39
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232982588
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2839,19 +2841,27 @@ public void testAbsoluteProcessingTimeTimerRejected() 
throws Exception {
 
 @ProcessElement
 public void processElement(@TimerId(timerId) Timer timer) {
-  timer.set(new Instant(0));
+  try {
+timer.set(new Instant(0));
+fail("Should have failed due to processing time with absolute 
timer.");
 
 Review comment:
   Yes, but not in processing time: 
https://github.com/apache/beam/commit/86bcf26c965f876be2e67aea769de3406ef25a97#diff-95b3ede99105b81444964de712a52386R967


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165357)
Time Spent: 10.5h  (was: 10h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165355
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:33
Start Date: 13/Nov/18 10:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232980512
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -304,16 +344,126 @@ protected void 
addSideInputValue(StreamRecord streamRecord) {
   @Override
   protected DoFnRunner createWrappingDoFnRunner(
   DoFnRunner wrappedRunner) {
-return new SdkHarnessDoFnRunner();
+sdkHarnessRunner =
+new SdkHarnessDoFnRunner<>(
+executableStage.getInputPCollection().getId(),
+stageBundleFactory,
+stateRequestHandler,
+progressHandler,
+outputManager,
+outputMap,
+executableStage.getTimers(),
+(Coder) 
windowingStrategy.getWindowFn().windowCoder(),
+(WindowedValue key, TimerInternals.TimerData timerData) -> 
{
+  try {
+keyForTimerToBeSet = keySelector.getKey(key);
+timerInternals.setTimer(timerData);
+  } catch (Exception e) {
+throw new RuntimeException("Couldn't set timer", e);
+  } finally {
+keyForTimerToBeSet = null;
+  }
+});
+return sdkHarnessRunner;
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+// Due to the asynchronous communication with the SDK harness,
+// a bundle might still be in progress and not all items have
+// yet been received from the SDk harness. If we just set this
+// watermark as the new output watermark, we could violate the
+// order of the records, i.e. pending items in the SDK harness
+// could become "late" although they were "on time".
+//
+// We can solve this problem using one of the following options:
+//
+// 1) Finish the current bundle and emit this watermark as the
+//new output watermark. Finishing the bundle ensures that
+//all the items have been processed by the SDK harness and
+//received by the outputQueue (see below), where they will
+//have been emitted to the output stream.
+//
+// 2) Put a hold on the output watermark for as long as the current
+//bundle has not been finished. We have to remember to manually
+//finish the bundle in case we receive the final watermark.
+//To avoid latency, we should process this watermark again as
+//soon as the current bundle is finished.
+//
+// Approach 1) is the easiest, yet 2) gives better throughput due
 
 Review comment:
   Clarifying this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165355)
Time Spent: 10h 10m  (was: 10h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165356=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165356
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:33
Start Date: 13/Nov/18 10:33
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232980538
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##
 @@ -442,22 +443,21 @@ public void close() throws Exception {
 }
   }
 
-  private long getPushbackWatermarkHold() {
+  long getPushbackWatermarkHold() {
 return pushedBackWatermark;
   }
 
+  void setPushedBackWatermark(long watermark) {
 
 Review comment:
   will change


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165356)
Time Spent: 10h 20m  (was: 10h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165354=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165354
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:32
Start Date: 13/Nov/18 10:32
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232980441
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,16 +534,86 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+void setTimerKey(Object key) {
+  this.beforeFireTimerKey = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String inputCollectionId = result.getKey();
+TupleTag tag = outputMap.get(inputCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+inputCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  // process timer elements
+  // TODO This is ugly. There should be an easier way to retrieve the
+  String timerPCollectionId =
+  inputCollectionId.substring(0, inputCollectionId.length() - 
".out:0".length());
+  TimerReference timerReference = 
timerReferenceMap.get(timerPCollectionId);
+  if (timerReference != null) {
+Timer timer =
+Preconditions.checkNotNull(
+(Timer) ((KV) windowedValue.getValue()).getValue(),
+"Received null Timer from SDK harness: %s",
+windowedValue);
+LOG.debug("Timer received: {} {}", inputCollectionId, timer);
+for (Object window : windowedValue.getWindows()) {
+  StateNamespace namespace =
+  StateNamespaces.window(windowCoder, (BoundedWindow) window);
+  TimerSpec timerSpec = extractTimerSpec(timerReference);
+  TimerInternals.TimerData timerData =
+  TimerInternals.TimerData.of(
+  timerPCollectionId,
+  namespace,
+  timer.getTimestamp(),
+  timerSpec.getTimeDomain());
+  timerDataConsumer.accept(windowedValue, timerData);
 
 Review comment:
   I agree it can be confusing. On the other hand, this decouples the timer 
generation code and the timer consumer code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165354)
Time Spent: 10h  (was: 9h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165353
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:32
Start Date: 13/Nov/18 10:32
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6981: 
[BEAM-4681] Add support for portable timers in Flink streaming mode 
URL: https://github.com/apache/beam/pull/6981#discussion_r232980105
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 ##
 @@ -225,18 +229,22 @@ public String getId() {
   @Override
   public Map>> 
getInputReceivers() {
 return ImmutableMap.of(
-"pCollectionId",
+"input",
 input -> {
   /* Ignore input*/
 });
   }
 
   @Override
   public void close() throws Exception {
+if (onceEmitted) {
 
 Review comment:
   Close is only called once per bundle but there are two bundles now due to 
the watermark processing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165353)
Time Spent: 9h 50m  (was: 9h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165352=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165352
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:19
Start Date: 13/Nov/18 10:19
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7008: [BEAM-4681] Add support 
for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#issuecomment-438212646
 
 
   Thanks for the review @ryan-williams!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165352)
Time Spent: 9h 40m  (was: 9.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165351
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:18
Start Date: 13/Nov/18 10:18
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232975079
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -270,8 +282,36 @@ public void setKeyContextElement1(StreamRecord record) 
throws Exception {
 
   @Override
   public void setCurrentKey(Object key) {
-throw new UnsupportedOperationException(
-"Current key for state backend can only be set by state requests from 
SDK workers.");
+// We don't need to set anything, the key is set manually on the state 
backend
+// This will be called by HeapInternalTimerService before a timer is fired
+if (!usesTimers) {
+  throw new UnsupportedOperationException(
+  "Current key for state backend can only be set by state requests 
from SDK workers or when processing timers.");
+}
+  }
+
+  @Override
+  public Object getCurrentKey() {
+// This is the key retrieved by HeapInternalTimerService when setting a 
Flink timer
+return keyForTimerToBeSet;
+  }
+
+  @Override
+  public void fireTimer(InternalTimer timer) {
+// We need to decode the key
+final ByteBuffer encodedKey = (ByteBuffer) timer.getKey();
+@SuppressWarnings("ByteBufferBackingArray")
+ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+final Object decodedKey;
+try {
+  decodedKey = keyCoder.decode(byteStream);
+} catch (IOException e) {
+  throw new RuntimeException(
+  String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
 
 Review comment:
   Addressing this in #6981.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165351)
Time Spent: 9.5h  (was: 9h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165345
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:00
Start Date: 13/Nov/18 10:00
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232968422
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,16 +534,86 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+void setTimerKey(Object key) {
+  this.beforeFireTimerKey = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String inputCollectionId = result.getKey();
+TupleTag tag = outputMap.get(inputCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+inputCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  // process timer elements
+  // TODO This is ugly. There should be an easier way to retrieve the
 
 Review comment:
   Timer pCollection id, will change in #6981.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165345)
Time Spent: 9h 10m  (was: 9h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165342
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 09:57
Start Date: 13/Nov/18 09:57
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232967092
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,23 +329,103 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
+  }
+};
+  } else if (timerReceiverFactory != null) {
+// Delegate to TimerReceiverFactory
+return timerReceiverFactory.create(collectionId);
+  } else {
+throw new IllegalStateException(
+String.format(Locale.ENGLISH, "Unknown PCollectionId %s", 
collectionId));
+  }
+}
+  }
+
+  private static class TimerReceiverFactory implements OutputReceiverFactory {
+
+/** Timer PCollection id => TimerReference. */
+private final HashMap timerReferenceMap;
+/** Timer PCollection id => timer name => TimerSpec. */
+private final Map> 
timerSpecMap;
+
+private final BiConsumer 
timerDataConsumer;
+private final Coder windowCoder;
+
+TimerReceiverFactory(
+Collection timerReferenceCollection,
+Map> 
timerSpecMap,
+BiConsumer timerDataConsumer,
+Coder windowCoder) {
+  this.timerReferenceMap = new HashMap<>();
+  for (TimerReference timerReference : timerReferenceCollection) {
+timerReferenceMap.put(timerReference.collection().getId(), 
timerReference);
+  }
+  this.timerSpecMap = timerSpecMap;
+  this.timerDataConsumer = timerDataConsumer;
+  this.windowCoder = windowCoder;
+}
+
+@Override
+public  FnDataReceiver create(String pCollectionId) {
+  // TODO This is ugly. There should be an easier way to retrieve the 
timer collectionid
+  String timerPCollectionId =
+  pCollectionId.substring(0, pCollectionId.length() - 
".out:0".length());
 
 Review comment:
   If it happens, it is definitely a bug. I suppose we could add a length-check 
to see if we can safely substring. The Precondition check afterwards should be 
sufficient to print a good error message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165342)
Time Spent: 9h  (was: 8h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165346=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165346
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 10:00
Start Date: 13/Nov/18 10:00
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232968493
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 ##
 @@ -85,17 +86,38 @@
   .setInput("input")
   .setComponents(
   Components.newBuilder()
+  .putTransforms(
+  "transform",
+  RunnerApi.PTransform.newBuilder()
+  .putInputs("bla", "input")
+  
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PAR_DO_TRANSFORM_URN))
+  .build())
   .putPcollections("input", PCollection.getDefaultInstance())
   .build())
+  .addUserStates(
+  
ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build())
   .build();
   private final JobInfo jobInfo =
   JobInfo.create("job-id", "job-name", "retrieval-token", 
Struct.getDefaultInstance());
 
   @Before
-  public void setUpMocks() {
+  public void setUpMocks() throws Exception {
 MockitoAnnotations.initMocks(this);
 when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
 
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class);
+when(stageBundleFactory.getBundle(any(), any(), 
any())).thenReturn(remoteBundle);
+//ProcessBundleDescriptors.ExecutableProcessBundleDescriptor 
processBundleDescriptor =
 
 Review comment:
   Nope, thanks for spotting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165346)
Time Spent: 9h 20m  (was: 9h 10m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165341
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 09:57
Start Date: 13/Nov/18 09:57
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232967092
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,23 +329,103 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
+  }
+};
+  } else if (timerReceiverFactory != null) {
+// Delegate to TimerReceiverFactory
+return timerReceiverFactory.create(collectionId);
+  } else {
+throw new IllegalStateException(
+String.format(Locale.ENGLISH, "Unknown PCollectionId %s", 
collectionId));
+  }
+}
+  }
+
+  private static class TimerReceiverFactory implements OutputReceiverFactory {
+
+/** Timer PCollection id => TimerReference. */
+private final HashMap timerReferenceMap;
+/** Timer PCollection id => timer name => TimerSpec. */
+private final Map> 
timerSpecMap;
+
+private final BiConsumer 
timerDataConsumer;
+private final Coder windowCoder;
+
+TimerReceiverFactory(
+Collection timerReferenceCollection,
+Map> 
timerSpecMap,
+BiConsumer timerDataConsumer,
+Coder windowCoder) {
+  this.timerReferenceMap = new HashMap<>();
+  for (TimerReference timerReference : timerReferenceCollection) {
+timerReferenceMap.put(timerReference.collection().getId(), 
timerReference);
+  }
+  this.timerSpecMap = timerSpecMap;
+  this.timerDataConsumer = timerDataConsumer;
+  this.windowCoder = windowCoder;
+}
+
+@Override
+public  FnDataReceiver create(String pCollectionId) {
+  // TODO This is ugly. There should be an easier way to retrieve the 
timer collectionid
+  String timerPCollectionId =
+  pCollectionId.substring(0, pCollectionId.length() - 
".out:0".length());
 
 Review comment:
   If it happens, it is definitely a bug. I suppose we could add a length-check 
to see if we can safely substring. The Precondition check aftwards should be 
sufficient to print a good error message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165341)
Time Spent: 8h 50m  (was: 8h 40m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165339
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 09:56
Start Date: 13/Nov/18 09:56
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232967092
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##
 @@ -230,23 +329,103 @@ public void close() throws Exception {
 private final Collector collector;
 
 private final Map outputMap;
+@Nullable private final TimerReceiverFactory timerReceiverFactory;
 
 ReceiverFactory(Collector collector, Map 
outputMap) {
+  this(collector, outputMap, null);
+}
+
+ReceiverFactory(
+Collector collector,
+Map outputMap,
+@Nullable TimerReceiverFactory timerReceiverFactory) {
   this.collector = collector;
   this.outputMap = outputMap;
+  this.timerReceiverFactory = timerReceiverFactory;
 }
 
 @Override
 public  FnDataReceiver create(String collectionId) {
   Integer unionTag = outputMap.get(collectionId);
-  checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
-  int tagInt = unionTag;
+  if (unionTag != null) {
+int tagInt = unionTag;
+return receivedElement -> {
+  synchronized (collectorLock) {
+collector.collect(new RawUnionValue(tagInt, receivedElement));
+  }
+};
+  } else if (timerReceiverFactory != null) {
+// Delegate to TimerReceiverFactory
+return timerReceiverFactory.create(collectionId);
+  } else {
+throw new IllegalStateException(
+String.format(Locale.ENGLISH, "Unknown PCollectionId %s", 
collectionId));
+  }
+}
+  }
+
+  private static class TimerReceiverFactory implements OutputReceiverFactory {
+
+/** Timer PCollection id => TimerReference. */
+private final HashMap timerReferenceMap;
+/** Timer PCollection id => timer name => TimerSpec. */
+private final Map> 
timerSpecMap;
+
+private final BiConsumer 
timerDataConsumer;
+private final Coder windowCoder;
+
+TimerReceiverFactory(
+Collection timerReferenceCollection,
+Map> 
timerSpecMap,
+BiConsumer timerDataConsumer,
+Coder windowCoder) {
+  this.timerReferenceMap = new HashMap<>();
+  for (TimerReference timerReference : timerReferenceCollection) {
+timerReferenceMap.put(timerReference.collection().getId(), 
timerReference);
+  }
+  this.timerSpecMap = timerSpecMap;
+  this.timerDataConsumer = timerDataConsumer;
+  this.windowCoder = windowCoder;
+}
+
+@Override
+public  FnDataReceiver create(String pCollectionId) {
+  // TODO This is ugly. There should be an easier way to retrieve the 
timer collectionid
+  String timerPCollectionId =
+  pCollectionId.substring(0, pCollectionId.length() - 
".out:0".length());
 
 Review comment:
   If it happens, it is definitely a bug. I suppose we could add a length-check 
to see if we can safely substring. The Precondition check AFAIK is enough 
afterwards.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165339)
Time Spent: 8h 40m  (was: 8.5h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165338
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 09:56
Start Date: 13/Nov/18 09:56
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232967073
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -270,8 +282,36 @@ public void setKeyContextElement1(StreamRecord record) 
throws Exception {
 
   @Override
   public void setCurrentKey(Object key) {
-throw new UnsupportedOperationException(
-"Current key for state backend can only be set by state requests from 
SDK workers.");
+// We don't need to set anything, the key is set manually on the state 
backend
+// This will be called by HeapInternalTimerService before a timer is fired
+if (!usesTimers) {
+  throw new UnsupportedOperationException(
+  "Current key for state backend can only be set by state requests 
from SDK workers or when processing timers.");
+}
+  }
+
+  @Override
+  public Object getCurrentKey() {
+// This is the key retrieved by HeapInternalTimerService when setting a 
Flink timer
+return keyForTimerToBeSet;
+  }
+
+  @Override
+  public void fireTimer(InternalTimer timer) {
+// We need to decode the key
+final ByteBuffer encodedKey = (ByteBuffer) timer.getKey();
+@SuppressWarnings("ByteBufferBackingArray")
+ByteArrayInputStream byteStream = new 
ByteArrayInputStream(encodedKey.array());
+final Object decodedKey;
+try {
+  decodedKey = keyCoder.decode(byteStream);
+} catch (IOException e) {
+  throw new RuntimeException(
+  String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", 
encodedKey));
 
 Review comment:
   I think we could do `Arrays.toString(buffer.array())`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165338)
Time Spent: 8.5h  (was: 8h 20m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165288=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165288
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 03:56
Start Date: 13/Nov/18 03:56
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7008: [BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232889550
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -359,16 +534,86 @@ public void finishBundle() {
 emitResults();
   } catch (Exception e) {
 throw new RuntimeException("Failed to finish remote bundle", e);
+  } finally {
+remoteBundle = null;
+  }
+  if (bundleFinishedCallback != null) {
+bundleFinishedCallback.run();
+bundleFinishedCallback = null;
   }
 }
 
+void setTimerKey(Object key) {
+  this.beforeFireTimerKey = key;
+}
+
+boolean isBundleInProgress() {
+  return remoteBundle != null;
+}
+
+void setBundleFinishedCallback(Runnable callback) {
+  this.bundleFinishedCallback = callback;
+}
+
 private void emitResults() {
   KV result;
   while ((result = outputQueue.poll()) != null) {
-outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
+final String inputCollectionId = result.getKey();
+TupleTag tag = outputMap.get(inputCollectionId);
+WindowedValue windowedValue =
+Preconditions.checkNotNull(
+(WindowedValue) result.getValue(),
+"Received a null value from the SDK harness for %s",
+inputCollectionId);
+if (tag != null) {
+  // process regular elements
+  outputManager.output(tag, windowedValue);
+} else {
+  // process timer elements
+  // TODO This is ugly. There should be an easier way to retrieve the
 
 Review comment:
   not sure if this is right but:
   ```suggestion
 // TODO This is ugly. There should be an easier way to retrieve 
the timer ID
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165288)
Time Spent: 8h 10m  (was: 8h)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink

2018-11-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165286=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165286
 ]

ASF GitHub Bot logged work on BEAM-4681:


Author: ASF GitHub Bot
Created on: 13/Nov/18 03:56
Start Date: 13/Nov/18 03:56
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7008: [BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r232890143
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 ##
 @@ -85,17 +86,38 @@
   .setInput("input")
   .setComponents(
   Components.newBuilder()
+  .putTransforms(
+  "transform",
+  RunnerApi.PTransform.newBuilder()
+  .putInputs("bla", "input")
+  
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PAR_DO_TRANSFORM_URN))
+  .build())
   .putPcollections("input", PCollection.getDefaultInstance())
   .build())
+  .addUserStates(
+  
ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build())
   .build();
   private final JobInfo jobInfo =
   JobInfo.create("job-id", "job-name", "retrieval-token", 
Struct.getDefaultInstance());
 
   @Before
-  public void setUpMocks() {
+  public void setUpMocks() throws Exception {
 MockitoAnnotations.initMocks(this);
 when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
 
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class);
+when(stageBundleFactory.getBundle(any(), any(), 
any())).thenReturn(remoteBundle);
+//ProcessBundleDescriptors.ExecutableProcessBundleDescriptor 
processBundleDescriptor =
 
 Review comment:
   intentionally leaving these here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 165286)
Time Spent: 8h  (was: 7h 50m)

> Integrate support for timers using the portability APIs into Flink
> --
>
> Key: BEAM-4681
> URL: https://issues.apache.org/jira/browse/BEAM-4681
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Luke Cwik
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >