Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4955#discussion_r149310451
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
    @@ -19,27 +19,35 @@
     
     import java.util.HashSet;
     import java.util.Set;
    -import java.util.stream.Collectors;
    -import java.util.stream.LongStream;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Class responsible for generating transactional ids to use when 
communicating with Kafka.
    + *
    + * <p>It guarantees that:
    + * - generated ids to use will never clash with ids to use from different 
subtasks
    + * - generated ids to abort will never clash with ids to abort from 
different subtasks
    + * - generated ids to use will never clash with ids to abort from 
different subtasks
    + *
    + * <p>In other words, any particular generated id will always be assigned 
to one and only one subtask.
      */
     public class TransactionalIdsGenerator {
        private final String prefix;
        private final int subtaskIndex;
    +   private final int totalNumberOfSubtasks;
        private final int poolSize;
        private final int safeScaleDownFactor;
     
        public TransactionalIdsGenerator(
                        String prefix,
                        int subtaskIndex,
    +                   int totalNumberOfSubtasks,
                        int poolSize,
                        int safeScaleDownFactor) {
                this.prefix = checkNotNull(prefix);
                this.subtaskIndex = subtaskIndex;
    +           this.totalNumberOfSubtasks = totalNumberOfSubtasks;
    --- End diff --
    
    Maybe we should add some argument checks for subtask index and 
totalNumberOfSubtasks, at least.


---

Reply via email to