Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149310451 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -19,27 +19,35 @@ import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * <p>It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * <p>In other words, any particular generated id will always be assigned to one and only one subtask. */ public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; + private final int totalNumberOfSubtasks; private final int poolSize; private final int safeScaleDownFactor; public TransactionalIdsGenerator( String prefix, int subtaskIndex, + int totalNumberOfSubtasks, int poolSize, int safeScaleDownFactor) { this.prefix = checkNotNull(prefix); this.subtaskIndex = subtaskIndex; + this.totalNumberOfSubtasks = totalNumberOfSubtasks; --- End diff -- Maybe we should add some argument checks for subtask index and totalNumberOfSubtasks, at least.
---