[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17205583#comment-17205583 ] Kostas Kloudas commented on FLINK-6215: --- Yes [~sewen], I will close the issue. The PRs are already closed. > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17205572#comment-17205572 ] Stephan Ewen commented on FLINK-6215: - This should issue should be subsumed by this one: FLINK-19457 [~kkl0u] Should we close this and the related PRs? > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698901#comment-16698901 ] ASF GitHub Bot commented on FLINK-6215: --- kl0u closed pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scalable. URL: https://github.com/apache/flink/pull/3669 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java index 84a9700cf8f..e3dfd6b9409 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java @@ -67,7 +67,7 @@ * ++ ++ ++ ++ ++ * - * Recovering the checkpoint with parallelism = 5 yields the following state assignment: + * Recovering the checkpoint with parallelism = 2 yields the following state assignment: * * func_1 func_2 * ++++ +++ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java index bdb12f39c3d..d9784e4001f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java @@ -20,25 +20,30 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.HashMap; +import java.util.Map; /** * A stateful streaming source that emits each number from a given interval exactly once, * possibly in parallel. * - * For the source to be re-scalable, the first time the job is run, we precompute all the elements - * that each of the tasks should emit and upon checkpointing, each element constitutes its own - * partition. When rescaling, these partitions will be randomly re-assigned to the new tasks. + * For the source to be re-scalable, the range of elements to be emitted is initially (at the first execution) + * split into {@code min(maxParallelism, totalNumberOfElements)} partitions, and for each one, we + * store the {@code nextOffset}, i.e. the next element to be emitted, and its {@code end}. Upon rescaling, these + * partitions can be reshuffled among the new tasks, and these will resume emitting from where their predecessors + * left off. * - * This strategy guarantees that each element will be emitted exactly-once, but elements will not - * necessarily be emitted in ascending order, even for the same tasks. + * Although each element will be emitted exactly-once, elements will not necessarily be emitted in ascending order, + * even for the same task. */ @PublicEvolving public class StatefulSequenceSource extends RichParallelSourceFunction implements CheckpointedFunction { @@ -50,9 +55,8 @@ private volatile boolean isRunning = true; - private transient Deque valuesToEmit; - - private transient ListState checkpointedState; + private transient Map endToNextOffsetMapping; + private transient ListState> checkpointedState; /** * Creates a source that emits all numbers from the given interval exactly once. @@ -61,6 +65,7 @@ * @param end End of the range of numbers to emit. */ public StatefulSequenceSource(long start, long end) { + Preconditions.checkArgument(start <= end); this.start = start; this.end = end; } @@ -68,45 +73,81 @@ public StatefulSequenceSource(long start, long end) { @Override public void initializeState(FunctionInitializationContext context) throws Exception { -
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104907#comment-16104907 ] ASF GitHub Bot commented on FLINK-6215: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3669#discussion_r130081125 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java --- @@ -61,52 +65,89 @@ * @param end End of the range of numbers to emit. */ public StatefulSequenceSource(long start, long end) { + Preconditions.checkArgument(start <= end); this.start = start; this.end = end; } @Override public void initializeState(FunctionInitializationContext context) throws Exception { - Preconditions.checkState(this.checkpointedState == null, + Preconditions.checkState(checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore().getOperatorState( new ListStateDescriptor<>( - "stateful-sequence-source-state", - LongSerializer.INSTANCE + "stateful-sequence-source-state", + new TupleSerializer<>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE } + ) ) ); - this.valuesToEmit = new ArrayDeque<>(); + this.endToNextOffsetMapping = new HashMap<>(); if (context.isRestored()) { - // upon restoring - - for (Long v : this.checkpointedState.get()) { - this.valuesToEmit.add(v); + for (Tuple2 partitionInfo: checkpointedState.get()) { --- End diff -- nit: empty spaces should surround ":" > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104906#comment-16104906 ] ASF GitHub Bot commented on FLINK-6215: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3669#discussion_r130080485 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java --- @@ -61,52 +65,89 @@ * @param end End of the range of numbers to emit. */ public StatefulSequenceSource(long start, long end) { + Preconditions.checkArgument(start <= end); this.start = start; this.end = end; } @Override public void initializeState(FunctionInitializationContext context) throws Exception { - Preconditions.checkState(this.checkpointedState == null, + Preconditions.checkState(checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore().getOperatorState( new ListStateDescriptor<>( - "stateful-sequence-source-state", - LongSerializer.INSTANCE + "stateful-sequence-source-state", + new TupleSerializer<>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE } + ) ) ); - this.valuesToEmit = new ArrayDeque<>(); + this.endToNextOffsetMapping = new HashMap<>(); if (context.isRestored()) { - // upon restoring - - for (Long v : this.checkpointedState.get()) { - this.valuesToEmit.add(v); + for (Tuple2 partitionInfo: checkpointedState.get()) { + Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1); + Preconditions.checkState(prev == null, + getClass().getSimpleName() + " : Duplicate entry when restoring."); } } else { - // the first time the job is executed - - final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); final int taskIdx = getRuntimeContext().getIndexOfThisSubtask(); - final long congruence = start + taskIdx; + final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + + final long totalElements = Math.abs(end - start + 1L); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism; - long totalNoOfElements = Math.abs(end - start + 1); - final int baseSize = safeDivide(totalNoOfElements, stepSize); - final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; + Tuple2 localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx); + int localStartIdx = localPartitionRange.f0; + int localEndIdx = localStartIdx + localPartitionRange.f1; - for (long collected = 0; collected < toCollect; collected++) { - this.valuesToEmit.add(collected * stepSize + congruence); + for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) { + Tuple2 limits = getPartitionLimits(totalElements, totalPartitions, partIdx); + endToNextOffsetMapping.put(limits.f1, limits.f0); } } } + private Tuple2 getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) { + int minPartitionSliceSize = totalPartitions / parallelTasks; + int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks; + + int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions); + int localRangeSize =
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104908#comment-16104908 ] ASF GitHub Bot commented on FLINK-6215: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3669#discussion_r130081876 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java --- @@ -61,52 +65,89 @@ * @param end End of the range of numbers to emit. */ public StatefulSequenceSource(long start, long end) { + Preconditions.checkArgument(start <= end); this.start = start; this.end = end; } @Override public void initializeState(FunctionInitializationContext context) throws Exception { - Preconditions.checkState(this.checkpointedState == null, + Preconditions.checkState(checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore().getOperatorState( new ListStateDescriptor<>( - "stateful-sequence-source-state", - LongSerializer.INSTANCE + "stateful-sequence-source-state", + new TupleSerializer<>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE } + ) ) ); - this.valuesToEmit = new ArrayDeque<>(); + this.endToNextOffsetMapping = new HashMap<>(); if (context.isRestored()) { - // upon restoring - - for (Long v : this.checkpointedState.get()) { - this.valuesToEmit.add(v); + for (Tuple2 partitionInfo: checkpointedState.get()) { + Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1); + Preconditions.checkState(prev == null, + getClass().getSimpleName() + " : Duplicate entry when restoring."); } } else { - // the first time the job is executed - - final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); final int taskIdx = getRuntimeContext().getIndexOfThisSubtask(); - final long congruence = start + taskIdx; + final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + + final long totalElements = Math.abs(end - start + 1L); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism; - long totalNoOfElements = Math.abs(end - start + 1); - final int baseSize = safeDivide(totalNoOfElements, stepSize); - final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; + Tuple2 localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx); + int localStartIdx = localPartitionRange.f0; + int localEndIdx = localStartIdx + localPartitionRange.f1; - for (long collected = 0; collected < toCollect; collected++) { - this.valuesToEmit.add(collected * stepSize + congruence); + for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) { + Tuple2 limits = getPartitionLimits(totalElements, totalPartitions, partIdx); + endToNextOffsetMapping.put(limits.f1, limits.f0); } } } + private Tuple2 getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) { + int minPartitionSliceSize = totalPartitions / parallelTasks; + int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks; + + int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions); + int localRangeSize =
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16004341#comment-16004341 ] ASF GitHub Bot commented on FLINK-6215: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3669 I think this would be crucial. It does not even work for any meaningful tests in the way it is in the code right now. +1 for putting this into 1.3 as a bugfix > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003111#comment-16003111 ] ASF GitHub Bot commented on FLINK-6215: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3669 @greghogan Not sure. In my understanding that was made for testing (this is why the old implementation was not efficient) but there may be also some more serious users. Any comments @aljoscha ? > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003105#comment-16003105 ] ASF GitHub Bot commented on FLINK-6215: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3669 @kl0u how important is this for 1.3? > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954925#comment-15954925 ] ASF GitHub Bot commented on FLINK-6215: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3669 [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. The current version of the PR has no backwards compatibility, as this becomes tricky given that we change the semantics of the state that we store. I believe that this is ok, given that it is a fix that has to go in the 1.3 and we are not sure if people are actually using it in production, i.e. in settings that need backwards compatibility. What do you think @aljoscha @StephanEwen ? You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink stateful-src Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3669 commit cf333b0b0c318569a1704ca71121c37dcd12bd3d Author: kl0uDate: 2017-03-29T16:21:02Z [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we do split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)