[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179679 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 29/Dec/18 11:47 Start Date: 29/Dec/18 11:47 Worklog Time Spent: 10m Work Description: mxm commented on issue #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359#issuecomment-450487164 >Out of curiosity, how did you discover this issue? Might be worth to add a comment to the JIRA ticket. I found the issue while writing the test for #7362. The test solely uses timers in the first operator, without any state requests. The state requests would otherwise initialize the state backend key which is required for keying the pending timers map state. I got a "key not initalized" error. In cases with a key initialized by the state requests, this would have caused pending timers to be associated with the wrong key which would have led to problems deleting / resetting them. Adding this also to the JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179679) Time Spent: 26h 10m (was: 26h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 26h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179676=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179676 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 29/Dec/18 11:32 Start Date: 29/Dec/18 11:32 Worklog Time Spent: 10m Work Description: mxm commented on pull request #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359#discussion_r244481038 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import javax.annotation.Nonnull; + +/** + * A lock which can always be acquired. It should not be used when a proper lock is required, but it + * is useful as a performance optimization when locking is not necessary but the code paths have to + * be shared between the locking and the non-locking variant. + * + * For example, in {@link + * org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}, the + * locking on the state backend is only required when both timers and state are used. + */ +public class NoopLock implements Lock, Serializable { Review comment: I thought about adding it to the SDK, but I wasn't sure how useful this would be for reuse. The use case is kind of a special optimization. I don't know how much performance it gives, as this depends on the number of state requests, but it also provides a hint how the locking granularity works for people reading the Runner code. There is a good chance someone who needs it would find it and move it to a different location. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179676) Time Spent: 26h (was: 25h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 26h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179609=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179609 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 29/Dec/18 04:04 Start Date: 29/Dec/18 04:04 Worklog Time Spent: 10m Work Description: tweise commented on pull request #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359#discussion_r29305 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/NoopLock.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import javax.annotation.Nonnull; + +/** + * A lock which can always be acquired. It should not be used when a proper lock is required, but it + * is useful as a performance optimization when locking is not necessary but the code paths have to + * be shared between the locking and the non-locking variant. + * + * For example, in {@link + * org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}, the + * locking on the state backend is only required when both timers and state are used. + */ +public class NoopLock implements Lock, Serializable { Review comment: Since this may be useful outside of the Flink runner, maybe move it to a common package like `org.apache.beam.sdk.util`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179609) Time Spent: 25h 50m (was: 25h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 25h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179532=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179532 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 28/Dec/18 18:48 Start Date: 28/Dec/18 18:48 Worklog Time Spent: 10m Work Description: mxm commented on issue #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359#issuecomment-450408419 No new test failures in `PortableValidatesRunner Streaming`. We need to address those separately: https://issues.apache.org/jira/browse/BEAM-6326 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179532) Time Spent: 25h 40m (was: 25.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 25h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179494 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 28/Dec/18 15:24 Start Date: 28/Dec/18 15:24 Worklog Time Spent: 10m Work Description: mxm commented on issue #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359#issuecomment-450376249 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179494) Time Spent: 25.5h (was: 25h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 25.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=179131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179131 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 27/Dec/18 16:07 Start Date: 27/Dec/18 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #7359: [BEAM-4681] Address synchronization issue for portable timers URL: https://github.com/apache/beam/pull/7359 This fixes a regression of BEAM-867 where the key needs to be set on the state backend to support deleting pending timers on timer registration or firing. This can intefere with accessing user-defined state, but could also interfere when a timer is set at the same time that a timer fires. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 179131) Time Spent: 25h 20m (was: 25h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 25h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. --
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167434 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 16:04 Start Date: 19/Nov/18 16:04 Worklog Time Spent: 10m Work Description: mxm closed pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ff1d38b859f..0147085ab34 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -243,8 +243,6 @@ class BeamModulePlugin implements Plugin { excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' - // TODO Enable test once timer-support for batch is merged - excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' //SplitableDoFnTests excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 2ed31b8ba48..3f26d5e2d99 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.flink; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap; +import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy; import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder; import com.google.common.collect.BiMap; @@ -55,7 +57,6 @@ import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvKeySelector; -import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils; import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.wire.WireCoders; @@ -299,16 +300,13 @@ public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeli private static void translateExecutableStage( PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) { -// TODO: Fail on stateful DoFns for now. -// TODO: Support stateful DoFns by inserting group-by-keys where necessary. // TODO: Fail on splittable DoFns. // TODO: Special-case single outputs to avoid multiplexing PCollections. RunnerApi.Components components = pipeline.getComponents(); Map outputs = transform.getTransform().getOutputsMap(); // Mapping from PCollection id to coder tag id. -BiMap outputMap = -FlinkPipelineTranslatorUtils.createOutputMap(outputs.values()); +BiMap outputMap = createOutputMap(outputs.values()); // Collect all output Coders and create a UnionCoder for our tagged outputs. List> unionCoders = Lists.newArrayList(); // Enforce tuple tag sorting by union tag index. @@ -338,21 +336,22 @@ public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeli } String inputPCollectionId = stagePayload.getInput(); +Coder> windowedInputCoder = +instantiateCoder(inputPCollectionId, components); + DataSet> inputDataSet = context.getDataSetOrThrow(inputPCollectionId); -final boolean stateful = stagePayload.getUserStatesCount() > 0; final FlinkExecutableStageFunction function = new FlinkExecutableStageFunction<>( stagePayload, context.getJobInfo(), outputMap,
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167422=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167422 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 15:46 Start Date: 19/Nov/18 15:46 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439938919 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167422) Time Spent: 25h (was: 24h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 25h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167406=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167406 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 14:37 Start Date: 19/Nov/18 14:37 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234641503 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,20 +330,92 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); Review comment: The stage-by-stage is only in place if the `ExecutionMode` is set to `FORCE_BATCH`, instead of the default `PIPELINE`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167406) Time Spent: 24h 50m (was: 24h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 24h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167405 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 14:33 Start Date: 19/Nov/18 14:33 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439912526 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167405) Time Spent: 24h 40m (was: 24.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 24h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167400 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 14:14 Start Date: 19/Nov/18 14:14 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234629349 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,20 +330,92 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); Review comment: Yes, but I don't know if it is applicable for batch since it is scheduled stage by stage. For streaming the issue would surface just by running wordcount, for batch not. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167400) Time Spent: 24h 20m (was: 24h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 24h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167398 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 14:13 Start Date: 19/Nov/18 14:13 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439905785 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167398) Time Spent: 24h 10m (was: 24h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 24h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167392 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 13:54 Start Date: 19/Nov/18 13:54 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439899958 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167392) Time Spent: 24h (was: 23h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 24h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167385 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 13:33 Start Date: 19/Nov/18 13:33 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439894046 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167385) Time Spent: 23h 50m (was: 23h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167384=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167384 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 13:33 Start Date: 19/Nov/18 13:33 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439893905 Squashed for merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167384) Time Spent: 23h 40m (was: 23.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167359 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 11:50 Start Date: 19/Nov/18 11:50 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439867543 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167359) Time Spent: 23.5h (was: 23h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167358=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167358 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 11:50 Start Date: 19/Nov/18 11:50 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439867471 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167358) Time Spent: 23h 20m (was: 23h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167336 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 10:26 Start Date: 19/Nov/18 10:26 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234565020 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,20 +330,92 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); Review comment: Good question. I think you are right. This can deadlock if the receiving side backpressures because it is waiting for input from another channel generated by this operator. Is that what you meant? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167336) Time Spent: 23h 10m (was: 23h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167333 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 10:08 Start Date: 19/Nov/18 10:08 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234558497 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -927,22 +941,53 @@ public void setTimer( /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ @Deprecated @Override -public void setTimer(TimerData timerKey) { - long time = timerKey.getTimestamp().getMillis(); - switch (timerKey.getDomain()) { +public void setTimer(TimerData timer) { + try { +getKeyedStateBackend().setCurrentKey(getCurrentKey()); +String uniqueTimerId = getUniqueTimerId(timer); Review comment: It's because a new timer with timerId X resets the currently pending timer X in the same context. So we have to look up and remove the current timer and then set the new timer which involves adding it back to the "pending timers" map. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167333) Time Spent: 23h (was: 22h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 23h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167327=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167327 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 09:42 Start Date: 19/Nov/18 09:42 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234549296 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -918,6 +922,16 @@ public TimerInternals timerInternals() { class FlinkTimerInternals implements TimerInternals { +/** Pending timers which are necessary for supporting removal of existing timers. */ Review comment: I'll clarify. Basically timers which have not been fired yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167327) Time Spent: 22h 50m (was: 22h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167326=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167326 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 19/Nov/18 09:40 Start Date: 19/Nov/18 09:40 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234548677 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler( public void mapPartition( Iterable> iterable, Collector collector) throws Exception { -processElements(iterable, collector); + +ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap); +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); +} } - /** For stateful processing via a GroupReduceFunction. */ + /** For stateful and timer processing via a GroupReduceFunction. */ @Override public void reduce(Iterable> iterable, Collector collector) throws Exception { -bagUserStateHandlerFactory.resetForNewKey(); -processElements(iterable, collector); + +// Need to discard the old key's state +if (bagUserStateHandlerFactory != null) { + bagUserStateHandlerFactory.resetForNewKey(); +} + +// Used with Batch, we know that all the data is available for this key. We can't use the +// timer manager from the context because it doesn't exist. So we create one and advance +// time to the end after processing all elements. +final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); +timerInternals.advanceProcessingTime(Instant.now()); +timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + +ReceiverFactory receiverFactory = +new ReceiverFactory( +collector, +outputMap, +new TimerReceiverFactory( +stageBundleFactory, +executableStage.getTimers(), + stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(), +(WindowedValue timerElement, TimerInternals.TimerData timerData) -> { + currentTimerKey = (((KV) timerElement.getValue()).getKey()); + timerInternals.setTimer(timerData); +}, +windowCoder)); + +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); +} + +// Finish any pending windows by advancing the input watermark to infinity. +timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); +// Finally, advance the processing time to infinity to fire any timers. +timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + + fireEligibleTimers( + timerInternals, + (String timerId, WindowedValue timerValue) -> { +FnDataReceiver> fnTimerReceiver = +bundle.getInputReceivers().get(timerId); +Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver found for %s", timerId); +try { + fnTimerReceiver.accept(timerValue); +} catch (Exception e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); +} + }); +} } - private void processElements( - Iterable> iterable, Collector collector) + private void processElements(Iterable> iterable, RemoteBundle bundle) throws Exception { -checkState( -runtimeContext == getRuntimeContext(), -"RuntimeContext changed from under us. State handler invalid."); -checkState( -stageBundleFactory != null, "%s not yet prepared", StageBundleFactory.class.getName()); -checkState( -stateRequestHandler != null, "%s not yet prepared", StateRequestHandler.class.getName()); +Preconditions.checkArgument(bundle != null, "RemoteBundle must not be null"); + +String inputPCollectionId = executableStage.getInputPCollection().getId(); +FnDataReceiver> mainReceiver = +Preconditions.checkNotNull( +bundle.getInputReceivers().get(inputPCollectionId), +"Main
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167268 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234464322 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -918,6 +922,16 @@ public TimerInternals timerInternals() { class FlinkTimerInternals implements TimerInternals { +/** Pending timers which are necessary for supporting removal of existing timers. */ +private final MapState pendingTimersById; + +private FlinkTimerInternals() { + MapStateDescriptor pendingTimersByIdStateDescriptor = + new MapStateDescriptor<>( + "timer-dedup", new StringSerializer(), new CoderTypeSerializer<>(timerCoder)); Review comment: "timer-dedup" => "pending-timers" ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167268) Time Spent: 22h 10m (was: 22h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167271 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234464396 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -927,22 +941,53 @@ public void setTimer( /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ @Deprecated @Override -public void setTimer(TimerData timerKey) { - long time = timerKey.getTimestamp().getMillis(); - switch (timerKey.getDomain()) { +public void setTimer(TimerData timer) { + try { +getKeyedStateBackend().setCurrentKey(getCurrentKey()); +String uniqueTimerId = getUniqueTimerId(timer); Review comment: The logic here also needs explanation. We first remove the pending timer but then it is being put back in `registerTimer`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167271) Time Spent: 22.5h (was: 22h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167267 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234463725 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler( public void mapPartition( Iterable> iterable, Collector collector) throws Exception { -processElements(iterable, collector); + +ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap); +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); +} } - /** For stateful processing via a GroupReduceFunction. */ + /** For stateful and timer processing via a GroupReduceFunction. */ @Override public void reduce(Iterable> iterable, Collector collector) throws Exception { -bagUserStateHandlerFactory.resetForNewKey(); -processElements(iterable, collector); + +// Need to discard the old key's state +if (bagUserStateHandlerFactory != null) { + bagUserStateHandlerFactory.resetForNewKey(); +} + +// Used with Batch, we know that all the data is available for this key. We can't use the +// timer manager from the context because it doesn't exist. So we create one and advance +// time to the end after processing all elements. +final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); +timerInternals.advanceProcessingTime(Instant.now()); +timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + +ReceiverFactory receiverFactory = +new ReceiverFactory( +collector, +outputMap, +new TimerReceiverFactory( +stageBundleFactory, +executableStage.getTimers(), + stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(), +(WindowedValue timerElement, TimerInternals.TimerData timerData) -> { + currentTimerKey = (((KV) timerElement.getValue()).getKey()); + timerInternals.setTimer(timerData); +}, +windowCoder)); + +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + processElements(iterable, bundle); +} + +// Finish any pending windows by advancing the input watermark to infinity. +timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); +// Finally, advance the processing time to infinity to fire any timers. +timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + +try (RemoteBundle bundle = +stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { + + fireEligibleTimers( + timerInternals, + (String timerId, WindowedValue timerValue) -> { +FnDataReceiver> fnTimerReceiver = +bundle.getInputReceivers().get(timerId); +Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver found for %s", timerId); +try { + fnTimerReceiver.accept(timerValue); +} catch (Exception e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue)); +} + }); +} } - private void processElements( - Iterable> iterable, Collector collector) + private void processElements(Iterable> iterable, RemoteBundle bundle) throws Exception { -checkState( -runtimeContext == getRuntimeContext(), -"RuntimeContext changed from under us. State handler invalid."); -checkState( -stageBundleFactory != null, "%s not yet prepared", StageBundleFactory.class.getName()); -checkState( -stateRequestHandler != null, "%s not yet prepared", StateRequestHandler.class.getName()); +Preconditions.checkArgument(bundle != null, "RemoteBundle must not be null"); + +String inputPCollectionId = executableStage.getInputPCollection().getId(); +FnDataReceiver> mainReceiver = +Preconditions.checkNotNull( +bundle.getInputReceivers().get(inputPCollectionId), +"Main
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167269 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234463917 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,20 +330,92 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); Review comment: In streaming doing this in the RPC handler can cause a deadlock when collect triggers the next bundle in an operator chain. Presumably that cannot happen due to different scheduling in batch? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167269) Time Spent: 22h 10m (was: 22h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167270 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234464176 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,20 +330,92 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); + } +}; + } else if (timerReceiverFactory != null) { +// Delegate to TimerReceiverFactory +return timerReceiverFactory.create(collectionId); + } else { +throw new IllegalStateException( +String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId)); + } +} + } + + private static class TimerReceiverFactory implements OutputReceiverFactory { + +private final StageBundleFactory stageBundleFactory; +/** Timer PCollection id => TimerReference. */ +private final HashMap timerOutputIdToSpecMap; +/** Timer PCollection id => timer name => TimerSpec. */ +private final Map> timerSpecMap; + +private final BiConsumer timerDataConsumer; +private final Coder windowCoder; + +TimerReceiverFactory( +StageBundleFactory stageBundleFactory, +Collection timerReferenceCollection, +Map> timerSpecMap, +BiConsumer timerDataConsumer, +Coder windowCoder) { + this.stageBundleFactory = stageBundleFactory; + this.timerOutputIdToSpecMap = new HashMap<>(); + // Gather all timers from all transforms by their output pCollectionId which is unique + for (Map transformTimerMap : + stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) { +for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerMap.values()) { + timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec); +} + } + this.timerSpecMap = timerSpecMap; + this.timerDataConsumer = timerDataConsumer; + this.windowCoder = windowCoder; +} + +@Override +public FnDataReceiver create(String pCollectionId) { + final ProcessBundleDescriptors.TimerSpec timerSpec = + timerOutputIdToSpecMap.get(pCollectionId); + return receivedElement -> { -synchronized (collectorLock) { - collector.collect(new RawUnionValue(tagInt, receivedElement)); +WindowedValue windowedValue = (WindowedValue) receivedElement; +Timer timer = +Preconditions.checkNotNull( +(Timer) ((KV) windowedValue.getValue()).getValue(), +"Received null Timer from SDK harness: %s", +receivedElement); +LOG.info("Timer received: {} {}", pCollectionId, timer); Review comment: debug This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167270) Time Spent: 22h 20m (was: 22h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167272 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 18/Nov/18 22:13 Start Date: 18/Nov/18 22:13 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r234464294 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -918,6 +922,16 @@ public TimerInternals timerInternals() { class FlinkTimerInternals implements TimerInternals { +/** Pending timers which are necessary for supporting removal of existing timers. */ Review comment: What are "pending timers"? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167272) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 22.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166944 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 16:39 Start Date: 16/Nov/18 16:39 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439451923 Same goes for "Run JavaPortabilityApi PreCommit" which fails continuously at the moment for all commits. My PR doesn't change anything regarding the portability API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166944) Time Spent: 21h 50m (was: 21h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166943 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 16:38 Start Date: 16/Nov/18 16:38 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439451545 Unfortunately, the Jenkins setup for it seems to be broken at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166943) Time Spent: 21h 40m (was: 21.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166940 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 16:38 Start Date: 16/Nov/18 16:38 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439451306 Ran "Java Flink PortableValidatesRunner" locally with the latest master. All tests passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166940) Time Spent: 21.5h (was: 21h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166882 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:14 Start Date: 16/Nov/18 14:14 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439405481 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166882) Time Spent: 21h (was: 20h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166881=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166881 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:14 Start Date: 16/Nov/18 14:14 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439405438 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166881) Time Spent: 20h 50m (was: 20h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166892 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:20 Start Date: 16/Nov/18 14:20 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439407400 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166892) Time Spent: 21h 20m (was: 21h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166891=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166891 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:20 Start Date: 16/Nov/18 14:20 Worklog Time Spent: 10m Work Description: mxm removed a comment on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439405481 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166891) Time Spent: 21h 10m (was: 21h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 21h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166876 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:05 Start Date: 16/Nov/18 14:05 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439402850 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166876) Time Spent: 20h 40m (was: 20.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166874 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:02 Start Date: 16/Nov/18 14:02 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439401903 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166874) Time Spent: 20.5h (was: 20h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166873 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 14:02 Start Date: 16/Nov/18 14:02 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439401869 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166873) Time Spent: 20h 20m (was: 20h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166851 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 12:06 Start Date: 16/Nov/18 12:06 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439373857 "Run Java Flink PortableValidatesRunner" seems to be broken due to Docker container issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166851) Time Spent: 20h 10m (was: 20h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166848=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166848 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 12:04 Start Date: 16/Nov/18 12:04 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439373448 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166848) Time Spent: 19h 40m (was: 19.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 19h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166849=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166849 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 12:04 Start Date: 16/Nov/18 12:04 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439373531 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166849) Time Spent: 19h 50m (was: 19h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 19h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166850=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166850 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 12:04 Start Date: 16/Nov/18 12:04 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439373552 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166850) Time Spent: 20h (was: 19h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 20h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166841 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 11:09 Start Date: 16/Nov/18 11:09 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439361428 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166841) Time Spent: 19.5h (was: 19h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Fix For: 2.9.0 > > Time Spent: 19.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166838=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166838 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 10:35 Start Date: 16/Nov/18 10:35 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439352585 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166838) Time Spent: 19h 20m (was: 19h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 19h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166830=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166830 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 16/Nov/18 10:18 Start Date: 16/Nov/18 10:18 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439347844 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166830) Time Spent: 19h (was: 18h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 19h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166629 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 15/Nov/18 20:06 Start Date: 15/Nov/18 20:06 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439172785 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166629) Time Spent: 18h 40m (was: 18.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 18h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166630 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 15/Nov/18 20:06 Start Date: 15/Nov/18 20:06 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-439172809 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 166630) Time Spent: 18h 50m (was: 18h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 18h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=166016=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166016 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 18:52 Start Date: 14/Nov/18 18:52 Worklog Time Spent: 10m Work Description: tweise closed pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index bd83ad1ea47..ff1d38b859f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -243,6 +243,7 @@ class BeamModulePlugin implements Plugin { excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' + // TODO Enable test once timer-support for batch is merged excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' //SplitableDoFnTests excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' @@ -1471,6 +1472,9 @@ artifactId=${project.name} "--runner=org.apache.beam.runners.reference.testing.TestPortableRunner", "--jobServerDriver=${config.jobServerDriver}", "--environmentCacheMillis=1", +// TODO Create two tasks to run for both batch and streaming: +// https://issues.apache.org/jira/browse/BEAM-6009 +// "--streaming" ] if (config.jobServerConfig) { beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}") diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index bf25dc65013..a8adc97edcf 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy; import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder; import com.fasterxml.jackson.databind.JsonNode; @@ -562,8 +563,7 @@ private void translateStreamingImpulse( Coder keyCoder = null; KeySelector, ?> keySelector = null; -final boolean stateful = stagePayload.getUserStatesCount() > 0; -if (stateful) { +if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) { // Stateful stages are only allowed of KV input Coder valueCoder = ((WindowedValue.FullWindowedValueCoder) windowedInputCoder).getValueCoder(); @@ -601,6 +601,7 @@ private void translateStreamingImpulse( context.getJobInfo(), FlinkExecutableStageContext.factory(context.getPipelineOptions()), collectionIdToTupleTag, +getWindowingStrategy(inputPCollectionId, components), keyCoder, keySelector); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java index 1710a27da7c..2ea3b9323d3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java @@ -22,10 +22,14 @@ import com.google.common.collect.Sets; import java.io.IOException; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.fnexecution.wire.WireCoders; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165978 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 17:45 Start Date: 14/Nov/18 17:45 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233550751 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: Neat. With your fix I was able to build a `timerOutputIdToSpecMap` from output collection id to TimerSpec. This also got rid of some boiler plate code which I previously used to extract the TimerSpec. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165978) Time Spent: 18h (was: 17h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 18h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165980 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 17:45 Start Date: 14/Nov/18 17:45 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438753473 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165980) Time Spent: 18h 20m (was: 18h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 18h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165979 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 17:45 Start Date: 14/Nov/18 17:45 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438753446 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165979) Time Spent: 18h 10m (was: 18h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 18h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165953 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 16:32 Start Date: 14/Nov/18 16:32 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233522450 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: https://github.com/mxm/beam/pull/1 This also avoids hacks to guess the output name from the input name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165953) Time Spent: 17h 50m (was: 17h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 17h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165945 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 16:17 Start Date: 14/Nov/18 16:17 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438721581 PVR are extremely slow because the Docker environment takes long to come up for every test: ``` INFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment jenkins-docker-apache.bintray.io/beam/java for worker id 1 ``` Eventually the 100 minute timeout kills the job. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165945) Time Spent: 17h 20m (was: 17h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 17h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165948 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 16:28 Start Date: 14/Nov/18 16:28 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233520762 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: OK, the problem is that there are multiple copies of components and pcollections floating around... The right way to solve this is to patch in https://github.com/mxm/beam/pull/1/commits/55baf15befea582a3fc7796f9c0a4a031c3485b3 and then use stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs() in SdkHarnessDoFnRunner to construct timerReferenceMap or whatever else is needed, which has the full information. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165948) Time Spent: 17.5h (was: 17h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 17.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165904 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 14:50 Start Date: 14/Nov/18 14:50 Worklog Time Spent: 10m Work Description: tweise commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438687972 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165904) Time Spent: 17h 10m (was: 17h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 17h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165901 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 14:36 Start Date: 14/Nov/18 14:36 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233471781 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: I think it will be best we merge this PR and revert #6986. BEAM-5999 can then rebase and address this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165901) Time Spent: 17h (was: 16h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 17h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165884=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165884 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 13:55 Start Date: 14/Nov/18 13:55 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233443898 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +533,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); +setTimer(windowedValue, timerData); + } +} + } +} + +private String extractTimerPCollectionId(String outputPCollectionId) { + return stageBundleFactory + .getProcessBundleDescriptor() + .getProcessBundleDescriptor() + .getPcollectionsMap() + .get(outputPCollectionId) + .getUniqueName(); Review comment: It is still necessary. The timerReferenceMap gives me `ParDo.timer.foo` but while processing the timer elements, I get `ParDo.timer.foo.out:0`. The issue seems to be that the output collection ids are not set correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165884) Time Spent: 16h 40m (was: 16.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 16h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165886 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 13:58 Start Date: 14/Nov/18 13:58 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233456585 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +533,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); +setTimer(windowedValue, timerData); + } +} + } +} + +private String extractTimerPCollectionId(String outputPCollectionId) { + return stageBundleFactory + .getProcessBundleDescriptor() + .getProcessBundleDescriptor() + .getPcollectionsMap() + .get(outputPCollectionId) + .getUniqueName(); Review comment: >Unless I'm mistaken, extractTimerPCollectionId should not be needed at all anymore, as timerReferenceMap is indexed by outputPCollectionId itself. It is still necessary. The timerReferenceMap gives me `ParDo.timer.foo` for the output name of timer "foo", when it is `ParDo.timer.foo.out:0` at runtime. I believe there lies the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165886) Time Spent: 16h 50m (was: 16h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 16h 50m > Remaining Estimate: 0h > > Consider using the code produced in
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165883=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165883 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 13:55 Start Date: 14/Nov/18 13:55 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233455289 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: >Timers are absolutely determined by their transform name and local name; probably it makes sense to use that That's what is used here. The keys of `timerReferenceMap` are transform name + local name. The "extractTimerPCollectionId" method does the mapping of output collection to timer name. >and then also have a mapping fullyQualifiedTimerName -> timerInputPCollectionId for plumbing timers to the SDK, and a map timerOutputPCollectionId -> fullyQualifiedTimerName for recording timers from the SDK (like we do here). I couldn't find a nice way to create a map timerOutputPCollectionId -> fullyQualifiedTimerName. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165883) Time Spent: 16.5h (was: 16h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 16.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165873 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 12:29 Start Date: 14/Nov/18 12:29 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233426740 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +534,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); Review comment: The notion "timerCollectionId" is ambiguous, as timers are associated with two distinct PCollections at execution time. Timers are absolutely determined by their transform name and local name; probably it makes sense to use that (let's call it fullyQualifiedTimerName) for the TimerData here, and then also have a mapping fullyQualifiedTimerName -> timerInputPCollectionId for plumbing timers to the SDK, and a map timerOutputPCollectionId -> fullyQualifiedTimerName for recording timers from the SDK (like we do here). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165873) Time Spent: 16h 10m (was: 16h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 16h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165853=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165853 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 10:16 Start Date: 14/Nov/18 10:16 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438610754 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165853) Time Spent: 15h 50m (was: 15h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 15h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165852 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 14/Nov/18 10:16 Start Date: 14/Nov/18 10:16 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438610701 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165852) Time Spent: 15h 40m (was: 15.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 15h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165677=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165677 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 22:49 Start Date: 13/Nov/18 22:49 Worklog Time Spent: 10m Work Description: tweise commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438468348 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165677) Time Spent: 15h 20m (was: 15h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 15h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165672=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165672 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 22:28 Start Date: 13/Nov/18 22:28 Worklog Time Spent: 10m Work Description: tweise commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438462766 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165672) Time Spent: 15h 10m (was: 15h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 15h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165645 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 21:19 Start Date: 13/Nov/18 21:19 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233225330 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +533,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); +setTimer(windowedValue, timerData); + } +} + } +} + +private String extractTimerPCollectionId(String outputPCollectionId) { + return stageBundleFactory + .getProcessBundleDescriptor() + .getProcessBundleDescriptor() + .getPcollectionsMap() + .get(outputPCollectionId) + .getUniqueName(); Review comment: For example, I have `ParDo.timer.foo.out:0` which resolves to `ParDo.timer.output` instead of `ParDo.timer.foo`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165645) Time Spent: 15h (was: 14h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 15h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165644 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 21:16 Start Date: 13/Nov/18 21:16 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438439729 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165644) Time Spent: 14h 50m (was: 14h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 14h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165643=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165643 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 21:16 Start Date: 13/Nov/18 21:16 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438439587 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165643) Time Spent: 14h 40m (was: 14.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 14h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165606=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165606 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 19:41 Start Date: 13/Nov/18 19:41 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233194809 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,13 +533,100 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +/** Key for timer which has not been registered yet. */ +Object getTimerKeyForRegistration() { + return keyForTimerToBeSet; +} + +/** Key for timer which is about to be fired. */ +void setTimerKeyForFire(Object key) { + this.keyForTimerToBeFired = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String outputPCollectionId = Preconditions.checkNotNull(result.getKey()); +TupleTag tag = outputMap.get(outputPCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +outputPCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + final String timerCollectionId = extractTimerPCollectionId(outputPCollectionId); + TimerSpec timerSpec = extractTimerSpec(timerCollectionId); + Timer timer = + Preconditions.checkNotNull( + (Timer) ((KV) windowedValue.getValue()).getValue(), + "Received null Timer from SDK harness: %s", + windowedValue); + LOG.debug("Timer received: {} {}", outputPCollectionId, timer); + for (Object window : windowedValue.getWindows()) { +StateNamespace namespace = StateNamespaces.window(windowCoder, (BoundedWindow) window); +TimerInternals.TimerData timerData = +TimerInternals.TimerData.of( +timerCollectionId, namespace, timer.getTimestamp(), timerSpec.getTimeDomain()); +setTimer(windowedValue, timerData); + } +} + } +} + +private String extractTimerPCollectionId(String outputPCollectionId) { + return stageBundleFactory + .getProcessBundleDescriptor() + .getProcessBundleDescriptor() + .getPcollectionsMap() + .get(outputPCollectionId) + .getUniqueName(); Review comment: The name here is somehow not stable anymore after #6986. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165606) Time Spent: 14h 10m (was: 14h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 14h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165566=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165566 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:33 Start Date: 13/Nov/18 18:33 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438385710 Looks like a Jenkins problem, looking into it. ``` 18:55:59 Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00019a30, 1915748352, 0) failed; error='Cannot allocate memory' (errno=12) 18:55:59 # 18:55:59 # There is insufficient memory for the Java Runtime Environment to continue. 18:55:59 # Native memory allocation (mmap) failed to map 1915748352 bytes for committing reserved memory.``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165566) Time Spent: 13.5h (was: 13h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 13.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165576=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165576 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:44 Start Date: 13/Nov/18 18:44 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438389218 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165576) Time Spent: 14h (was: 13h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 14h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165575 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:43 Start Date: 13/Nov/18 18:43 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438389186 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165575) Time Spent: 13h 50m (was: 13h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 13h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165567=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165567 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:33 Start Date: 13/Nov/18 18:33 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438385817 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165567) Time Spent: 13h 40m (was: 13.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 13h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165556 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:09 Start Date: 13/Nov/18 18:09 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233161421 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -304,16 +342,141 @@ protected void addSideInputValue(StreamRecord streamRecord) { @Override protected DoFnRunner createWrappingDoFnRunner( DoFnRunner wrappedRunner) { -return new SdkHarnessDoFnRunner(); +sdkHarnessRunner = +new SdkHarnessDoFnRunner<>( +executableStage.getInputPCollection().getId(), +stageBundleFactory, +stateRequestHandler, +progressHandler, +outputManager, +outputMap, +executableStage.getTimers(), +(Coder) windowingStrategy.getWindowFn().windowCoder(), +(WindowedValue key, TimerInternals.TimerData timerData) -> { + try { +synchronized (getKeyedStateBackend()) { + setCurrentKey(keySelector.getKey(key)); + timerInternals.setTimer(timerData); +} + } catch (Exception e) { +throw new RuntimeException("Couldn't set key", e); + } +}, +() -> { + synchronized (getKeyedStateBackend()) { +ByteBuffer encodedKey = (ByteBuffer) getKeyedStateBackend().getCurrentKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +try { + return keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format( + Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); +} + } +}); +return sdkHarnessRunner; } - private class SdkHarnessDoFnRunner implements DoFnRunner { + @Override + public void processWatermark(Watermark mark) throws Exception { +// Due to the asynchronous communication with the SDK harness, +// a bundle might still be in progress and not all items have +// yet been received from the SDk harness. If we just set this +// watermark as the new output watermark, we could violate the +// order of the records, i.e. pending items in the SDK harness +// could become "late" although they were "on time". +// +// We can solve this problem using one of the following options: +// +// 1) Finish the current bundle and emit this watermark as the +//new output watermark. Finishing the bundle ensures that +//all the items have been processed by the SDK harness and +//received by the outputQueue (see below), where they will +//have been emitted to the output stream. +// +// 2) Put a hold on the output watermark for as long as the current +//bundle has not been finished. We have to remember to manually +//finish the bundle in case we receive the final watermark. +//To avoid latency, we should process this watermark again as +//soon as the current bundle is finished. +// +// Approach 1) is the easiest, yet 2) gives better throughput due +// to the bundle getting cut on every watermark. So we have +// implemented 2) below. +// +if (sdkHarnessRunner.isBundleInProgress()) { + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { +invokeFinishBundle(); + } else { +// It is not safe to advance the output watermark yet, so add a hold on the current +// output watermark. +setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold())); +sdkHarnessRunner.setBundleFinishedCallback( +() -> { + try { +processWatermark(mark); + } catch (Exception e) { +throw new RuntimeException( +"Failed to process pushed back watermark after finished bundle.", e); + } +}); + } +} +super.processWatermark(mark); + } + + private static class SdkHarnessDoFnRunner Review comment: Regarding the operator as a whole and as discussed in the past, constraints are mostly inherited and would need to be
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165554=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165554 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 18:05 Start Date: 13/Nov/18 18:05 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r233159888 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -304,16 +342,141 @@ protected void addSideInputValue(StreamRecord streamRecord) { @Override protected DoFnRunner createWrappingDoFnRunner( DoFnRunner wrappedRunner) { -return new SdkHarnessDoFnRunner(); +sdkHarnessRunner = +new SdkHarnessDoFnRunner<>( +executableStage.getInputPCollection().getId(), +stageBundleFactory, +stateRequestHandler, +progressHandler, +outputManager, +outputMap, +executableStage.getTimers(), +(Coder) windowingStrategy.getWindowFn().windowCoder(), +(WindowedValue key, TimerInternals.TimerData timerData) -> { + try { +synchronized (getKeyedStateBackend()) { + setCurrentKey(keySelector.getKey(key)); + timerInternals.setTimer(timerData); +} + } catch (Exception e) { +throw new RuntimeException("Couldn't set key", e); + } +}, +() -> { + synchronized (getKeyedStateBackend()) { +ByteBuffer encodedKey = (ByteBuffer) getKeyedStateBackend().getCurrentKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +try { + return keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format( + Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); +} + } +}); +return sdkHarnessRunner; } - private class SdkHarnessDoFnRunner implements DoFnRunner { + @Override + public void processWatermark(Watermark mark) throws Exception { +// Due to the asynchronous communication with the SDK harness, +// a bundle might still be in progress and not all items have +// yet been received from the SDk harness. If we just set this +// watermark as the new output watermark, we could violate the +// order of the records, i.e. pending items in the SDK harness +// could become "late" although they were "on time". +// +// We can solve this problem using one of the following options: +// +// 1) Finish the current bundle and emit this watermark as the +//new output watermark. Finishing the bundle ensures that +//all the items have been processed by the SDK harness and +//received by the outputQueue (see below), where they will +//have been emitted to the output stream. +// +// 2) Put a hold on the output watermark for as long as the current +//bundle has not been finished. We have to remember to manually +//finish the bundle in case we receive the final watermark. +//To avoid latency, we should process this watermark again as +//soon as the current bundle is finished. +// +// Approach 1) is the easiest, yet 2) gives better throughput due +// to the bundle getting cut on every watermark. So we have +// implemented 2) below. +// +if (sdkHarnessRunner.isBundleInProgress()) { + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { +invokeFinishBundle(); + } else { +// It is not safe to advance the output watermark yet, so add a hold on the current +// output watermark. +setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold())); +sdkHarnessRunner.setBundleFinishedCallback( +() -> { + try { +processWatermark(mark); + } catch (Exception e) { +throw new RuntimeException( +"Failed to process pushed back watermark after finished bundle.", e); + } +}); + } +} +super.processWatermark(mark); + } + + private static class SdkHarnessDoFnRunner Review comment: I would agree with that if `SdkHarnessDoFnRunner` was a component with a prospect of being used independently. Instead it is
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165526=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165526 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 17:54 Start Date: 13/Nov/18 17:54 Worklog Time Spent: 10m Work Description: tweise commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438371397 Run Python Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165526) Time Spent: 13h (was: 12h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 13h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165525=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165525 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 17:50 Start Date: 13/Nov/18 17:50 Worklog Time Spent: 10m Work Description: tweise commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438370184 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165525) Time Spent: 12h 50m (was: 12h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 12h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165511 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 17:32 Start Date: 13/Nov/18 17:32 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438363264 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165511) Time Spent: 12h 20m (was: 12h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 12h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165514 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 17:34 Start Date: 13/Nov/18 17:34 Worklog Time Spent: 10m Work Description: mxm removed a comment on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438363264 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165514) Time Spent: 12h 40m (was: 12.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 12h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165513 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 17:34 Start Date: 13/Nov/18 17:34 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438364394 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165513) Time Spent: 12.5h (was: 12h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 12.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165422=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165422 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 13:15 Start Date: 13/Nov/18 13:15 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438261605 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165422) Time Spent: 12h (was: 11h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 12h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165421=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165421 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 13:15 Start Date: 13/Nov/18 13:15 Worklog Time Spent: 10m Work Description: mxm commented on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438261549 I've addressed your comments @tweise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165421) Time Spent: 11h 50m (was: 11h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 11h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165420=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165420 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 13:14 Start Date: 13/Nov/18 13:14 Worklog Time Spent: 10m Work Description: mxm removed a comment on issue #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#issuecomment-438261209 Run Java Flink PortableValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165420) Time Spent: 11h 40m (was: 11.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 11h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165375=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165375 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 11:27 Start Date: 13/Nov/18 11:27 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438232854 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165375) Time Spent: 11h (was: 10h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 11h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165376=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165376 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 11:27 Start Date: 13/Nov/18 11:27 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438232874 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165376) Time Spent: 11h 10m (was: 11h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 11h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165374=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165374 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 11:25 Start Date: 13/Nov/18 11:25 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232998285 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,16 +534,86 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +void setTimerKey(Object key) { + this.beforeFireTimerKey = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String inputCollectionId = result.getKey(); +TupleTag tag = outputMap.get(inputCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +inputCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + // process timer elements + // TODO This is ugly. There should be an easier way to retrieve the + String timerPCollectionId = + inputCollectionId.substring(0, inputCollectionId.length() - ".out:0".length()); + TimerReference timerReference = timerReferenceMap.get(timerPCollectionId); + if (timerReference != null) { +Timer timer = +Preconditions.checkNotNull( +(Timer) ((KV) windowedValue.getValue()).getValue(), +"Received null Timer from SDK harness: %s", +windowedValue); +LOG.debug("Timer received: {} {}", inputCollectionId, timer); +for (Object window : windowedValue.getWindows()) { + StateNamespace namespace = + StateNamespaces.window(windowCoder, (BoundedWindow) window); + TimerSpec timerSpec = extractTimerSpec(timerReference); + TimerInternals.TimerData timerData = + TimerInternals.TimerData.of( + timerPCollectionId, + namespace, + timer.getTimestamp(), + timerSpec.getTimeDomain()); + timerDataConsumer.accept(windowedValue, timerData); Review comment: Pushed a commit which removes the BiConsumer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165374) Time Spent: 10h 50m (was: 10h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 10h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165363 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 11:01 Start Date: 13/Nov/18 11:01 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232990625 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -304,16 +342,141 @@ protected void addSideInputValue(StreamRecord streamRecord) { @Override protected DoFnRunner createWrappingDoFnRunner( DoFnRunner wrappedRunner) { -return new SdkHarnessDoFnRunner(); +sdkHarnessRunner = +new SdkHarnessDoFnRunner<>( +executableStage.getInputPCollection().getId(), +stageBundleFactory, +stateRequestHandler, +progressHandler, +outputManager, +outputMap, +executableStage.getTimers(), +(Coder) windowingStrategy.getWindowFn().windowCoder(), +(WindowedValue key, TimerInternals.TimerData timerData) -> { + try { +synchronized (getKeyedStateBackend()) { + setCurrentKey(keySelector.getKey(key)); + timerInternals.setTimer(timerData); +} + } catch (Exception e) { +throw new RuntimeException("Couldn't set key", e); + } +}, +() -> { + synchronized (getKeyedStateBackend()) { +ByteBuffer encodedKey = (ByteBuffer) getKeyedStateBackend().getCurrentKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +try { + return keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format( + Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); +} + } +}); +return sdkHarnessRunner; } - private class SdkHarnessDoFnRunner implements DoFnRunner { + @Override + public void processWatermark(Watermark mark) throws Exception { +// Due to the asynchronous communication with the SDK harness, +// a bundle might still be in progress and not all items have +// yet been received from the SDk harness. If we just set this +// watermark as the new output watermark, we could violate the +// order of the records, i.e. pending items in the SDK harness +// could become "late" although they were "on time". +// +// We can solve this problem using one of the following options: +// +// 1) Finish the current bundle and emit this watermark as the +//new output watermark. Finishing the bundle ensures that +//all the items have been processed by the SDK harness and +//received by the outputQueue (see below), where they will +//have been emitted to the output stream. +// +// 2) Put a hold on the output watermark for as long as the current +//bundle has not been finished. We have to remember to manually +//finish the bundle in case we receive the final watermark. +//To avoid latency, we should process this watermark again as +//soon as the current bundle is finished. +// +// Approach 1) is the easiest, yet 2) gives better throughput due +// to the bundle getting cut on every watermark. So we have +// implemented 2) below. +// +if (sdkHarnessRunner.isBundleInProgress()) { + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { +invokeFinishBundle(); + } else { +// It is not safe to advance the output watermark yet, so add a hold on the current +// output watermark. +setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold())); +sdkHarnessRunner.setBundleFinishedCallback( +() -> { + try { +processWatermark(mark); + } catch (Exception e) { +throw new RuntimeException( +"Failed to process pushed back watermark after finished bundle.", e); + } +}); + } +} +super.processWatermark(mark); + } + + private static class SdkHarnessDoFnRunner Review comment: We could move this out of the operator now which would increase readability. If we don't split it into a separate static class,
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165357=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165357 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:39 Start Date: 13/Nov/18 10:39 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232982588 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2839,19 +2841,27 @@ public void testAbsoluteProcessingTimeTimerRejected() throws Exception { @ProcessElement public void processElement(@TimerId(timerId) Timer timer) { - timer.set(new Instant(0)); + try { +timer.set(new Instant(0)); +fail("Should have failed due to processing time with absolute timer."); Review comment: Yes, but not in processing time: https://github.com/apache/beam/commit/86bcf26c965f876be2e67aea769de3406ef25a97#diff-95b3ede99105b81444964de712a52386R967 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165357) Time Spent: 10.5h (was: 10h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 10.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165355 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:33 Start Date: 13/Nov/18 10:33 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232980512 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -304,16 +344,126 @@ protected void addSideInputValue(StreamRecord streamRecord) { @Override protected DoFnRunner createWrappingDoFnRunner( DoFnRunner wrappedRunner) { -return new SdkHarnessDoFnRunner(); +sdkHarnessRunner = +new SdkHarnessDoFnRunner<>( +executableStage.getInputPCollection().getId(), +stageBundleFactory, +stateRequestHandler, +progressHandler, +outputManager, +outputMap, +executableStage.getTimers(), +(Coder) windowingStrategy.getWindowFn().windowCoder(), +(WindowedValue key, TimerInternals.TimerData timerData) -> { + try { +keyForTimerToBeSet = keySelector.getKey(key); +timerInternals.setTimer(timerData); + } catch (Exception e) { +throw new RuntimeException("Couldn't set timer", e); + } finally { +keyForTimerToBeSet = null; + } +}); +return sdkHarnessRunner; + } + + @Override + public void processWatermark(Watermark mark) throws Exception { +// Due to the asynchronous communication with the SDK harness, +// a bundle might still be in progress and not all items have +// yet been received from the SDk harness. If we just set this +// watermark as the new output watermark, we could violate the +// order of the records, i.e. pending items in the SDK harness +// could become "late" although they were "on time". +// +// We can solve this problem using one of the following options: +// +// 1) Finish the current bundle and emit this watermark as the +//new output watermark. Finishing the bundle ensures that +//all the items have been processed by the SDK harness and +//received by the outputQueue (see below), where they will +//have been emitted to the output stream. +// +// 2) Put a hold on the output watermark for as long as the current +//bundle has not been finished. We have to remember to manually +//finish the bundle in case we receive the final watermark. +//To avoid latency, we should process this watermark again as +//soon as the current bundle is finished. +// +// Approach 1) is the easiest, yet 2) gives better throughput due Review comment: Clarifying this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165355) Time Spent: 10h 10m (was: 10h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 10h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165356=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165356 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:33 Start Date: 13/Nov/18 10:33 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232980538 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ## @@ -442,22 +443,21 @@ public void close() throws Exception { } } - private long getPushbackWatermarkHold() { + long getPushbackWatermarkHold() { return pushedBackWatermark; } + void setPushedBackWatermark(long watermark) { Review comment: will change This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165356) Time Spent: 10h 20m (was: 10h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 10h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165354=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165354 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:32 Start Date: 13/Nov/18 10:32 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232980441 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,16 +534,86 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +void setTimerKey(Object key) { + this.beforeFireTimerKey = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String inputCollectionId = result.getKey(); +TupleTag tag = outputMap.get(inputCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +inputCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + // process timer elements + // TODO This is ugly. There should be an easier way to retrieve the + String timerPCollectionId = + inputCollectionId.substring(0, inputCollectionId.length() - ".out:0".length()); + TimerReference timerReference = timerReferenceMap.get(timerPCollectionId); + if (timerReference != null) { +Timer timer = +Preconditions.checkNotNull( +(Timer) ((KV) windowedValue.getValue()).getValue(), +"Received null Timer from SDK harness: %s", +windowedValue); +LOG.debug("Timer received: {} {}", inputCollectionId, timer); +for (Object window : windowedValue.getWindows()) { + StateNamespace namespace = + StateNamespaces.window(windowCoder, (BoundedWindow) window); + TimerSpec timerSpec = extractTimerSpec(timerReference); + TimerInternals.TimerData timerData = + TimerInternals.TimerData.of( + timerPCollectionId, + namespace, + timer.getTimestamp(), + timerSpec.getTimeDomain()); + timerDataConsumer.accept(windowedValue, timerData); Review comment: I agree it can be confusing. On the other hand, this decouples the timer generation code and the timer consumer code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165354) Time Spent: 10h (was: 9h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 10h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165353 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:32 Start Date: 13/Nov/18 10:32 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6981: [BEAM-4681] Add support for portable timers in Flink streaming mode URL: https://github.com/apache/beam/pull/6981#discussion_r232980105 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java ## @@ -225,18 +229,22 @@ public String getId() { @Override public Map>> getInputReceivers() { return ImmutableMap.of( -"pCollectionId", +"input", input -> { /* Ignore input*/ }); } @Override public void close() throws Exception { +if (onceEmitted) { Review comment: Close is only called once per bundle but there are two bundles now due to the watermark processing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165353) Time Spent: 9h 50m (was: 9h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165352=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165352 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:19 Start Date: 13/Nov/18 10:19 Worklog Time Spent: 10m Work Description: mxm commented on issue #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#issuecomment-438212646 Thanks for the review @ryan-williams! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165352) Time Spent: 9h 40m (was: 9.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165351 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:18 Start Date: 13/Nov/18 10:18 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232975079 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -270,8 +282,36 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { @Override public void setCurrentKey(Object key) { -throw new UnsupportedOperationException( -"Current key for state backend can only be set by state requests from SDK workers."); +// We don't need to set anything, the key is set manually on the state backend +// This will be called by HeapInternalTimerService before a timer is fired +if (!usesTimers) { + throw new UnsupportedOperationException( + "Current key for state backend can only be set by state requests from SDK workers or when processing timers."); +} + } + + @Override + public Object getCurrentKey() { +// This is the key retrieved by HeapInternalTimerService when setting a Flink timer +return keyForTimerToBeSet; + } + + @Override + public void fireTimer(InternalTimer timer) { +// We need to decode the key +final ByteBuffer encodedKey = (ByteBuffer) timer.getKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +final Object decodedKey; +try { + decodedKey = keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); Review comment: Addressing this in #6981. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165351) Time Spent: 9.5h (was: 9h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165345 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:00 Start Date: 13/Nov/18 10:00 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232968422 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,16 +534,86 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +void setTimerKey(Object key) { + this.beforeFireTimerKey = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String inputCollectionId = result.getKey(); +TupleTag tag = outputMap.get(inputCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +inputCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + // process timer elements + // TODO This is ugly. There should be an easier way to retrieve the Review comment: Timer pCollection id, will change in #6981. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165345) Time Spent: 9h 10m (was: 9h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165342 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 09:57 Start Date: 13/Nov/18 09:57 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232967092 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,23 +329,103 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); + } +}; + } else if (timerReceiverFactory != null) { +// Delegate to TimerReceiverFactory +return timerReceiverFactory.create(collectionId); + } else { +throw new IllegalStateException( +String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId)); + } +} + } + + private static class TimerReceiverFactory implements OutputReceiverFactory { + +/** Timer PCollection id => TimerReference. */ +private final HashMap timerReferenceMap; +/** Timer PCollection id => timer name => TimerSpec. */ +private final Map> timerSpecMap; + +private final BiConsumer timerDataConsumer; +private final Coder windowCoder; + +TimerReceiverFactory( +Collection timerReferenceCollection, +Map> timerSpecMap, +BiConsumer timerDataConsumer, +Coder windowCoder) { + this.timerReferenceMap = new HashMap<>(); + for (TimerReference timerReference : timerReferenceCollection) { +timerReferenceMap.put(timerReference.collection().getId(), timerReference); + } + this.timerSpecMap = timerSpecMap; + this.timerDataConsumer = timerDataConsumer; + this.windowCoder = windowCoder; +} + +@Override +public FnDataReceiver create(String pCollectionId) { + // TODO This is ugly. There should be an easier way to retrieve the timer collectionid + String timerPCollectionId = + pCollectionId.substring(0, pCollectionId.length() - ".out:0".length()); Review comment: If it happens, it is definitely a bug. I suppose we could add a length-check to see if we can safely substring. The Precondition check afterwards should be sufficient to print a good error message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165342) Time Spent: 9h (was: 8h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165346=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165346 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 10:00 Start Date: 13/Nov/18 10:00 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232968493 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java ## @@ -85,17 +86,38 @@ .setInput("input") .setComponents( Components.newBuilder() + .putTransforms( + "transform", + RunnerApi.PTransform.newBuilder() + .putInputs("bla", "input") + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PAR_DO_TRANSFORM_URN)) + .build()) .putPcollections("input", PCollection.getDefaultInstance()) .build()) + .addUserStates( + ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build()) .build(); private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", "retrieval-token", Struct.getDefaultInstance()); @Before - public void setUpMocks() { + public void setUpMocks() throws Exception { MockitoAnnotations.initMocks(this); when(runtimeContext.getDistributedCache()).thenReturn(distributedCache); when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory); +RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class); +when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle); +//ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor = Review comment: Nope, thanks for spotting. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165346) Time Spent: 9h 20m (was: 9h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 9h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165341 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 09:57 Start Date: 13/Nov/18 09:57 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232967092 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,23 +329,103 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); + } +}; + } else if (timerReceiverFactory != null) { +// Delegate to TimerReceiverFactory +return timerReceiverFactory.create(collectionId); + } else { +throw new IllegalStateException( +String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId)); + } +} + } + + private static class TimerReceiverFactory implements OutputReceiverFactory { + +/** Timer PCollection id => TimerReference. */ +private final HashMap timerReferenceMap; +/** Timer PCollection id => timer name => TimerSpec. */ +private final Map> timerSpecMap; + +private final BiConsumer timerDataConsumer; +private final Coder windowCoder; + +TimerReceiverFactory( +Collection timerReferenceCollection, +Map> timerSpecMap, +BiConsumer timerDataConsumer, +Coder windowCoder) { + this.timerReferenceMap = new HashMap<>(); + for (TimerReference timerReference : timerReferenceCollection) { +timerReferenceMap.put(timerReference.collection().getId(), timerReference); + } + this.timerSpecMap = timerSpecMap; + this.timerDataConsumer = timerDataConsumer; + this.windowCoder = windowCoder; +} + +@Override +public FnDataReceiver create(String pCollectionId) { + // TODO This is ugly. There should be an easier way to retrieve the timer collectionid + String timerPCollectionId = + pCollectionId.substring(0, pCollectionId.length() - ".out:0".length()); Review comment: If it happens, it is definitely a bug. I suppose we could add a length-check to see if we can safely substring. The Precondition check aftwards should be sufficient to print a good error message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165341) Time Spent: 8h 50m (was: 8h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165339 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 09:56 Start Date: 13/Nov/18 09:56 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232967092 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,23 +329,103 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); + } +}; + } else if (timerReceiverFactory != null) { +// Delegate to TimerReceiverFactory +return timerReceiverFactory.create(collectionId); + } else { +throw new IllegalStateException( +String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId)); + } +} + } + + private static class TimerReceiverFactory implements OutputReceiverFactory { + +/** Timer PCollection id => TimerReference. */ +private final HashMap timerReferenceMap; +/** Timer PCollection id => timer name => TimerSpec. */ +private final Map> timerSpecMap; + +private final BiConsumer timerDataConsumer; +private final Coder windowCoder; + +TimerReceiverFactory( +Collection timerReferenceCollection, +Map> timerSpecMap, +BiConsumer timerDataConsumer, +Coder windowCoder) { + this.timerReferenceMap = new HashMap<>(); + for (TimerReference timerReference : timerReferenceCollection) { +timerReferenceMap.put(timerReference.collection().getId(), timerReference); + } + this.timerSpecMap = timerSpecMap; + this.timerDataConsumer = timerDataConsumer; + this.windowCoder = windowCoder; +} + +@Override +public FnDataReceiver create(String pCollectionId) { + // TODO This is ugly. There should be an easier way to retrieve the timer collectionid + String timerPCollectionId = + pCollectionId.substring(0, pCollectionId.length() - ".out:0".length()); Review comment: If it happens, it is definitely a bug. I suppose we could add a length-check to see if we can safely substring. The Precondition check AFAIK is enough afterwards. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165339) Time Spent: 8h 40m (was: 8.5h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 40m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165338 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 09:56 Start Date: 13/Nov/18 09:56 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232967073 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -270,8 +282,36 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { @Override public void setCurrentKey(Object key) { -throw new UnsupportedOperationException( -"Current key for state backend can only be set by state requests from SDK workers."); +// We don't need to set anything, the key is set manually on the state backend +// This will be called by HeapInternalTimerService before a timer is fired +if (!usesTimers) { + throw new UnsupportedOperationException( + "Current key for state backend can only be set by state requests from SDK workers or when processing timers."); +} + } + + @Override + public Object getCurrentKey() { +// This is the key retrieved by HeapInternalTimerService when setting a Flink timer +return keyForTimerToBeSet; + } + + @Override + public void fireTimer(InternalTimer timer) { +// We need to decode the key +final ByteBuffer encodedKey = (ByteBuffer) timer.getKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +final Object decodedKey; +try { + decodedKey = keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); Review comment: I think we could do `Arrays.toString(buffer.array())`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165338) Time Spent: 8.5h (was: 8h 20m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8.5h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165288=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165288 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232889550 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,16 +534,86 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +void setTimerKey(Object key) { + this.beforeFireTimerKey = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String inputCollectionId = result.getKey(); +TupleTag tag = outputMap.get(inputCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +inputCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + // process timer elements + // TODO This is ugly. There should be an easier way to retrieve the Review comment: not sure if this is right but: ```suggestion // TODO This is ugly. There should be an easier way to retrieve the timer ID ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165288) Time Spent: 8h 10m (was: 8h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165286=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165286 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232890143 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java ## @@ -85,17 +86,38 @@ .setInput("input") .setComponents( Components.newBuilder() + .putTransforms( + "transform", + RunnerApi.PTransform.newBuilder() + .putInputs("bla", "input") + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PAR_DO_TRANSFORM_URN)) + .build()) .putPcollections("input", PCollection.getDefaultInstance()) .build()) + .addUserStates( + ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build()) .build(); private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", "retrieval-token", Struct.getDefaultInstance()); @Before - public void setUpMocks() { + public void setUpMocks() throws Exception { MockitoAnnotations.initMocks(this); when(runtimeContext.getDistributedCache()).thenReturn(distributedCache); when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory); +RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class); +when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle); +//ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor = Review comment: intentionally leaving these here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165286) Time Spent: 8h (was: 7h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)