[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2020-10-01 Thread Kostas Kloudas (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17205583#comment-17205583
 ] 

Kostas Kloudas commented on FLINK-6215:
---

Yes [~sewen], I will close the issue. The PRs are already closed.

> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2020-10-01 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17205572#comment-17205572
 ] 

Stephan Ewen commented on FLINK-6215:
-

This should issue should be subsumed by this one: FLINK-19457

[~kkl0u] Should we close this and the related PRs?

> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698901#comment-16698901
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

kl0u closed pull request #3669: [FLINK-6215] Make the StatefulSequenceSource 
scalable.
URL: https://github.com/apache/flink/pull/3669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 84a9700cf8f..e3dfd6b9409 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -67,7 +67,7 @@
  * ++   ++   ++   ++   ++
  * 
  
- * Recovering the checkpoint with parallelism = 5 yields the following 
state assignment:
+ * Recovering the checkpoint with parallelism = 2 yields the following 
state assignment:
  * 
  *  func_1  func_2
  * ++++   +++
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index bdb12f39c3d..d9784e4001f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -20,25 +20,30 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A stateful streaming source that emits each number from a given interval 
exactly once,
  * possibly in parallel.
  *
- * For the source to be re-scalable, the first time the job is run, we 
precompute all the elements
- * that each of the tasks should emit and upon checkpointing, each element 
constitutes its own
- * partition. When rescaling, these partitions will be randomly re-assigned to 
the new tasks.
+ * For the source to be re-scalable, the range of elements to be emitted is 
initially (at the first execution)
+ * split into {@code min(maxParallelism, totalNumberOfElements)} partitions, 
and for each one, we
+ * store the {@code nextOffset}, i.e. the next element to be emitted, and its 
{@code end}. Upon rescaling, these
+ * partitions can be reshuffled among the new tasks, and these will resume 
emitting from where their predecessors
+ * left off.
  *
- * This strategy guarantees that each element will be emitted exactly-once, 
but elements will not
- * necessarily be emitted in ascending order, even for the same tasks.
+ * Although each element will be emitted exactly-once, elements will not 
necessarily be emitted in ascending order,
+ * even for the same task.
  */
 @PublicEvolving
 public class StatefulSequenceSource extends RichParallelSourceFunction 
implements CheckpointedFunction {
@@ -50,9 +55,8 @@
 
private volatile boolean isRunning = true;
 
-   private transient Deque valuesToEmit;
-
-   private transient ListState checkpointedState;
+   private transient Map endToNextOffsetMapping;
+   private transient ListState> checkpointedState;
 
/**
 * Creates a source that emits all numbers from the given interval 
exactly once.
@@ -61,6 +65,7 @@
 * @param end End of the range of numbers to emit.
 */
public StatefulSequenceSource(long start, long end) {
+   Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
@@ -68,45 +73,81 @@ public StatefulSequenceSource(long start, long end) {
@Override
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   

[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104907#comment-16104907
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3669#discussion_r130081125
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
@@ -61,52 +65,89 @@
 * @param end End of the range of numbers to emit.
 */
public StatefulSequenceSource(long start, long end) {
+   Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
 
@Override
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   Preconditions.checkState(this.checkpointedState == null,
+   Preconditions.checkState(checkpointedState == null,
"The " + getClass().getSimpleName() + " has already 
been initialized.");
 
this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
-   "stateful-sequence-source-state",
-   LongSerializer.INSTANCE
+   "stateful-sequence-source-state", 
+   new TupleSerializer<>(
+   (Class>) (Class) Tuple2.class,
+   new TypeSerializer[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
+   )
)
);
 
-   this.valuesToEmit = new ArrayDeque<>();
+   this.endToNextOffsetMapping = new HashMap<>();
if (context.isRestored()) {
-   // upon restoring
-
-   for (Long v : this.checkpointedState.get()) {
-   this.valuesToEmit.add(v);
+   for (Tuple2 partitionInfo: 
checkpointedState.get()) {
--- End diff --

nit: empty spaces should surround ":"


> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104906#comment-16104906
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3669#discussion_r130080485
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
@@ -61,52 +65,89 @@
 * @param end End of the range of numbers to emit.
 */
public StatefulSequenceSource(long start, long end) {
+   Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
 
@Override
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   Preconditions.checkState(this.checkpointedState == null,
+   Preconditions.checkState(checkpointedState == null,
"The " + getClass().getSimpleName() + " has already 
been initialized.");
 
this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
-   "stateful-sequence-source-state",
-   LongSerializer.INSTANCE
+   "stateful-sequence-source-state", 
+   new TupleSerializer<>(
+   (Class>) (Class) Tuple2.class,
+   new TypeSerializer[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
+   )
)
);
 
-   this.valuesToEmit = new ArrayDeque<>();
+   this.endToNextOffsetMapping = new HashMap<>();
if (context.isRestored()) {
-   // upon restoring
-
-   for (Long v : this.checkpointedState.get()) {
-   this.valuesToEmit.add(v);
+   for (Tuple2 partitionInfo: 
checkpointedState.get()) {
+   Long prev = 
endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
+   Preconditions.checkState(prev == null,
+   getClass().getSimpleName() + " 
: Duplicate entry when restoring.");
}
} else {
-   // the first time the job is executed
-
-   final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
final int taskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
-   final long congruence = start + taskIdx;
+   final int parallelTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+
+   final long totalElements = Math.abs(end - start + 1L);
+   final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
+   final int totalPartitions = totalElements < 
Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : 
maxParallelism;
 
-   long totalNoOfElements = Math.abs(end - start + 1);
-   final int baseSize = safeDivide(totalNoOfElements, 
stepSize);
-   final int toCollect = (totalNoOfElements % stepSize > 
taskIdx) ? baseSize + 1 : baseSize;
+   Tuple2 localPartitionRange = 
getLocalRange(totalPartitions, parallelTasks, taskIdx);
+   int localStartIdx = localPartitionRange.f0;
+   int localEndIdx = localStartIdx + 
localPartitionRange.f1;
 
-   for (long collected = 0; collected < toCollect; 
collected++) {
-   this.valuesToEmit.add(collected * stepSize + 
congruence);
+   for (int partIdx = localStartIdx; partIdx < 
localEndIdx; partIdx++) {
+   Tuple2 limits = 
getPartitionLimits(totalElements, totalPartitions, partIdx);
+   endToNextOffsetMapping.put(limits.f1, 
limits.f0);
}
}
}
 
+   private Tuple2 getLocalRange(int totalPartitions, int 
parallelTasks, int taskIdx) {
+   int minPartitionSliceSize = totalPartitions / parallelTasks;
+   int remainingPartitions = totalPartitions - 
minPartitionSliceSize * parallelTasks;
+
+   int localRangeStartIdx = taskIdx * minPartitionSliceSize + 
Math.min(taskIdx, remainingPartitions);
+   int localRangeSize = 

[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104908#comment-16104908
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3669#discussion_r130081876
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
@@ -61,52 +65,89 @@
 * @param end End of the range of numbers to emit.
 */
public StatefulSequenceSource(long start, long end) {
+   Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
 
@Override
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   Preconditions.checkState(this.checkpointedState == null,
+   Preconditions.checkState(checkpointedState == null,
"The " + getClass().getSimpleName() + " has already 
been initialized.");
 
this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
-   "stateful-sequence-source-state",
-   LongSerializer.INSTANCE
+   "stateful-sequence-source-state", 
+   new TupleSerializer<>(
+   (Class>) (Class) Tuple2.class,
+   new TypeSerializer[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
+   )
)
);
 
-   this.valuesToEmit = new ArrayDeque<>();
+   this.endToNextOffsetMapping = new HashMap<>();
if (context.isRestored()) {
-   // upon restoring
-
-   for (Long v : this.checkpointedState.get()) {
-   this.valuesToEmit.add(v);
+   for (Tuple2 partitionInfo: 
checkpointedState.get()) {
+   Long prev = 
endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
+   Preconditions.checkState(prev == null,
+   getClass().getSimpleName() + " 
: Duplicate entry when restoring.");
}
} else {
-   // the first time the job is executed
-
-   final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
final int taskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
-   final long congruence = start + taskIdx;
+   final int parallelTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+
+   final long totalElements = Math.abs(end - start + 1L);
+   final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
+   final int totalPartitions = totalElements < 
Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : 
maxParallelism;
 
-   long totalNoOfElements = Math.abs(end - start + 1);
-   final int baseSize = safeDivide(totalNoOfElements, 
stepSize);
-   final int toCollect = (totalNoOfElements % stepSize > 
taskIdx) ? baseSize + 1 : baseSize;
+   Tuple2 localPartitionRange = 
getLocalRange(totalPartitions, parallelTasks, taskIdx);
+   int localStartIdx = localPartitionRange.f0;
+   int localEndIdx = localStartIdx + 
localPartitionRange.f1;
 
-   for (long collected = 0; collected < toCollect; 
collected++) {
-   this.valuesToEmit.add(collected * stepSize + 
congruence);
+   for (int partIdx = localStartIdx; partIdx < 
localEndIdx; partIdx++) {
+   Tuple2 limits = 
getPartitionLimits(totalElements, totalPartitions, partIdx);
+   endToNextOffsetMapping.put(limits.f1, 
limits.f0);
}
}
}
 
+   private Tuple2 getLocalRange(int totalPartitions, int 
parallelTasks, int taskIdx) {
+   int minPartitionSliceSize = totalPartitions / parallelTasks;
+   int remainingPartitions = totalPartitions - 
minPartitionSliceSize * parallelTasks;
+
+   int localRangeStartIdx = taskIdx * minPartitionSliceSize + 
Math.min(taskIdx, remainingPartitions);
+   int localRangeSize = 

[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16004341#comment-16004341
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3669
  
I think this would be crucial. It does not even work for any meaningful 
tests in the way it is in the code right now.

+1 for putting this into 1.3 as a bugfix


> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003111#comment-16003111
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3669
  
@greghogan Not sure. In my understanding that was made for testing (this is 
why the old implementation was not efficient) but there may be also some more 
serious users. Any comments @aljoscha ?


> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003105#comment-16003105
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3669
  
@kl0u how important is this for 1.3?


> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.

2017-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954925#comment-15954925
 ] 

ASF GitHub Bot commented on FLINK-6215:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3669

[FLINK-6215] Make the StatefulSequenceSource scalable.

So far this source was computing all the elements to
be emitted and stored them in memory. This could lead
to out-of-memory problems for large deployments. Now
we split the range of elements into partitions that
can be re-shuffled upon rescaling and we just store
the next offset and the end of each one of them upon
checkpointing.

The current version of the PR has no backwards compatibility,
as this becomes tricky given that we change the semantics
of the state that we store.

I believe that this is ok, given that it is a fix that has to go in
the 1.3 and we are not sure if people are actually using it in 
production, i.e. in settings that need backwards compatibility.

What do you think @aljoscha @StephanEwen ?



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink stateful-src

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3669


commit cf333b0b0c318569a1704ca71121c37dcd12bd3d
Author: kl0u 
Date:   2017-03-29T16:21:02Z

[FLINK-6215] Make the StatefulSequenceSource scalable.

So far this source was computing all the elements to
be emitted and stored them in memory. This could lead
to out-of-memory problems for large deployments. Now
we do split the range of elements into partitions that
can be re-shuffled upon rescaling and we just store
the next offset and the end of each one of them upon
checkpointing.




> Make the StatefulSequenceSource scalable.
> -
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)