[ 
https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104906#comment-16104906
 ] 

ASF GitHub Bot commented on FLINK-6215:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130080485
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
    @@ -61,52 +65,89 @@
         * @param end End of the range of numbers to emit.
         */
        public StatefulSequenceSource(long start, long end) {
    +           Preconditions.checkArgument(start <= end);
                this.start = start;
                this.end = end;
        }
     
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
     
    -           Preconditions.checkState(this.checkpointedState == null,
    +           Preconditions.checkState(checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
     
                this.checkpointedState = 
context.getOperatorStateStore().getOperatorState(
                        new ListStateDescriptor<>(
    -                           "stateful-sequence-source-state",
    -                           LongSerializer.INSTANCE
    +                           "stateful-sequence-source-state", 
    +                                   new TupleSerializer<>(
    +                                                   (Class<Tuple2<Long, 
Long>>) (Class<?>) Tuple2.class,
    +                                                   new TypeSerializer<?>[] 
{ LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +                                   )
                        )
                );
     
    -           this.valuesToEmit = new ArrayDeque<>();
    +           this.endToNextOffsetMapping = new HashMap<>();
                if (context.isRestored()) {
    -                   // upon restoring
    -
    -                   for (Long v : this.checkpointedState.get()) {
    -                           this.valuesToEmit.add(v);
    +                   for (Tuple2<Long, Long> partitionInfo: 
checkpointedState.get()) {
    +                           Long prev = 
endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +                           Preconditions.checkState(prev == null,
    +                                           getClass().getSimpleName() + " 
: Duplicate entry when restoring.");
                        }
                } else {
    -                   // the first time the job is executed
    -
    -                   final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
                        final int taskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
    -                   final long congruence = start + taskIdx;
    +                   final int parallelTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +                   final long totalElements = Math.abs(end - start + 1L);
    +                   final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +                   final int totalPartitions = totalElements < 
Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : 
maxParallelism;
     
    -                   long totalNoOfElements = Math.abs(end - start + 1);
    -                   final int baseSize = safeDivide(totalNoOfElements, 
stepSize);
    -                   final int toCollect = (totalNoOfElements % stepSize > 
taskIdx) ? baseSize + 1 : baseSize;
    +                   Tuple2<Integer, Integer> localPartitionRange = 
getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +                   int localStartIdx = localPartitionRange.f0;
    +                   int localEndIdx = localStartIdx + 
localPartitionRange.f1;
     
    -                   for (long collected = 0; collected < toCollect; 
collected++) {
    -                           this.valuesToEmit.add(collected * stepSize + 
congruence);
    +                   for (int partIdx = localStartIdx; partIdx < 
localEndIdx; partIdx++) {
    +                           Tuple2<Long, Long> limits = 
getPartitionLimits(totalElements, totalPartitions, partIdx);
    +                           endToNextOffsetMapping.put(limits.f1, 
limits.f0);
                        }
                }
        }
     
    +   private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int 
parallelTasks, int taskIdx) {
    +           int minPartitionSliceSize = totalPartitions / parallelTasks;
    +           int remainingPartitions = totalPartitions - 
minPartitionSliceSize * parallelTasks;
    +
    +           int localRangeStartIdx = taskIdx * minPartitionSliceSize + 
Math.min(taskIdx, remainingPartitions);
    +           int localRangeSize = taskIdx < remainingPartitions ? 
minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +           return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +   }
    +
    +   private Tuple2<Long, Long> getPartitionLimits(long totalElements, int 
totalPartitions, long partitionIdx) {
    +           long minElementPartitionSize = totalElements / totalPartitions;
    +           long remainingElements = totalElements - 
minElementPartitionSize * totalPartitions;
    +           long startOffset = start;
    +
    +           for (int idx = 0; idx < partitionIdx; idx++) {
    +                   long partitionSize = idx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +                   startOffset += partitionSize;
    +           }
    +
    +           long partitionSize = partitionIdx < remainingElements ? 
minElementPartitionSize + 1L : minElementPartitionSize;
    +           return new Tuple2<>(startOffset, startOffset + partitionSize);
    +   }
    +
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
    -           while (isRunning && !this.valuesToEmit.isEmpty()) {
    -                   synchronized (ctx.getCheckpointLock()) {
    -                           ctx.collect(this.valuesToEmit.poll());
    +           for (Map.Entry<Long, Long> partition: 
endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    nit: empty spaces should surround ":"


> Make the StatefulSequenceSource scalable.
> -----------------------------------------
>
>                 Key: FLINK-6215
>                 URL: https://issues.apache.org/jira/browse/FLINK-6215
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to 
> emit first and keeps them in memory. This is not scalable as for large 
> sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the 
> {{maxParallelism}} parameter, and just keep state (to checkpoint) per such 
> partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to