[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104908#comment-16104908
 ] 

ASF GitHub Bot commented on FLINK-6215:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130081876
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
    @@ -61,52 +65,89 @@
         * @param end End of the range of numbers to emit.
         */
        public StatefulSequenceSource(long start, long end) {
    +           Preconditions.checkArgument(start <= end);
                this.start = start;
                this.end = end;
        }
     
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
     
    -           Preconditions.checkState(this.checkpointedState == null,
    +           Preconditions.checkState(checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
     
                this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
                        new ListStateDescriptor<>(
    -                           "stateful-sequence-source-state",
    -                           LongSerializer.INSTANCE
    +                           "stateful-sequence-source-state", 
    +                                   new TupleSerializer<>(
    +                                                   (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
    +                                                   new TypeSerializer<?>[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +                                   )
                        )
                );
     
    -           this.valuesToEmit = new ArrayDeque<>();
    +           this.endToNextOffsetMapping = new HashMap<>();
                if (context.isRestored()) {
    -                   // upon restoring
    -
    -                   for (Long v : this.checkpointedState.get()) {
    -                           this.valuesToEmit.add(v);
    +                   for (Tuple2<Long, Long> partitionInfo: 
checkpointedState.get()) {
    +                           Long prev = 
endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +                           Preconditions.checkState(prev == null,
    +                                           getClass().getSimpleName() + " 
: Duplicate entry when restoring.");
                        }
                } else {
    -                   // the first time the job is executed
    -
    -                   final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
                        final int taskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
    -                   final long congruence = start + taskIdx;
    +                   final int parallelTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +                   final long totalElements = Math.abs(end - start + 1L);
    +                   final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +                   final int totalPartitions = totalElements < 
Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : 
maxParallelism;
     
    -                   long totalNoOfElements = Math.abs(end - start + 1);
    -                   final int baseSize = safeDivide(totalNoOfElements, 
stepSize);
    -                   final int toCollect = (totalNoOfElements % stepSize > 
taskIdx) ? baseSize + 1 : baseSize;
    +                   Tuple2<Integer, Integer> localPartitionRange = 
getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +                   int localStartIdx = localPartitionRange.f0;
    +                   int localEndIdx = localStartIdx + 
localPartitionRange.f1;
     
    -                   for (long collected = 0; collected < toCollect; 
collected++) {
    -                           this.valuesToEmit.add(collected * stepSize + 
congruence);
    +                   for (int partIdx = localStartIdx; partIdx < 
localEndIdx; partIdx++) {
    +                           Tuple2<Long, Long> limits = 
getPartitionLimits(totalElements, totalPartitions, partIdx);
    +                           endToNextOffsetMapping.put(limits.f1, 
limits.f0);
                        }
                }
        }
     
    +   private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int 
parallelTasks, int taskIdx) {
    +           int minPartitionSliceSize = totalPartitions / parallelTasks;
    +           int remainingPartitions = totalPartitions - 
minPartitionSliceSize * parallelTasks;
    +
    +           int localRangeStartIdx = taskIdx * minPartitionSliceSize + 
Math.min(taskIdx, remainingPartitions);
    +           int localRangeSize = taskIdx < remainingPartitions ? 
minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +           return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +   }
    +
    +   private Tuple2<Long, Long> getPartitionLimits(long totalElements, int 
totalPartitions, long partitionIdx) {
    +           long minElementPartitionSize = totalElements / totalPartitions;
    +           long remainingElements = totalElements - 
minElementPartitionSize * totalPartitions;
    +           long startOffset = start;
    +
    +           for (int idx = 0; idx < partitionIdx; idx++) {
    +                   long partitionSize = idx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +                   startOffset += partitionSize;
    +           }
    +
    +           long partitionSize = partitionIdx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +           return new Tuple2<>(startOffset, startOffset + partitionSize);
    +   }
    +
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
    -           while (isRunning && !this.valuesToEmit.isEmpty()) {
    -                   synchronized (ctx.getCheckpointLock()) {
    -                           ctx.collect(this.valuesToEmit.poll());
    +           for (Map.Entry<Long, Long> partition: 
endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    I wonder if it would make sense to emit local ranges by order of increasing 
"end offsets". That way at least the emitted values are still always increasing.
    While we can't really guarantee ordering with this new rescalable 
implementation, we could still do a best effort on that locally. What do you 
think?


> Make the StatefulSequenceSource scalable.
> -----------------------------------------
>
>                 Key: FLINK-6215
>                 URL: https://issues.apache.org/jira/browse/FLINK-6215
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to