AHeise commented on code in PR #192:
URL:
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2408539975
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java:
##########
@@ -56,73 +58,108 @@ public void testEnumStateSerde() throws IOException {
final KafkaSourceEnumState restoredState =
serializer.deserialize(serializer.getVersion(), bytes);
-
assertThat(restoredState.assignedPartitions()).isEqualTo(state.assignedPartitions());
- assertThat(restoredState.unassignedInitialPartitions())
- .isEqualTo(state.unassignedInitialPartitions());
+ assertThat(restoredState.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(state.assignedSplits());
+ assertThat(restoredState.unassignedSplits())
+ .containsExactlyInAnyOrderElementsOf(state.unassignedSplits());
assertThat(restoredState.initialDiscoveryFinished()).isTrue();
}
@Test
public void testBackwardCompatibility() throws IOException {
- final Set<TopicPartition> topicPartitions =
constructTopicPartitions(0);
- final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments =
- toSplitAssignments(topicPartitions);
+ final Set<KafkaPartitionSplit> splits = constructTopicSplits(0);
+ final Map<Integer, Collection<KafkaPartitionSplit>> splitAssignments =
+ toSplitAssignments(splits);
+ final List<SplitAndAssignmentStatus> splitAndAssignmentStatuses =
+ splits.stream()
+ .map(
+ split ->
+ new SplitAndAssignmentStatus(
+ split,
getAssignmentStatus(split)))
+ .collect(Collectors.toList());
// Create bytes in the way of KafkaEnumStateSerializer version 0 doing
serialization
final byte[] bytesV0 =
SerdeUtils.serializeSplitAssignments(
splitAssignments, new KafkaPartitionSplitSerializer());
// Create bytes in the way of KafkaEnumStateSerializer version 1 doing
serialization
- final byte[] bytesV1 =
-
KafkaSourceEnumStateSerializer.serializeTopicPartitions(topicPartitions);
+ final byte[] bytesV1 =
KafkaSourceEnumStateSerializer.serializeV1(splits);
+ final byte[] bytesV2 =
+
KafkaSourceEnumStateSerializer.serializeV2(splitAndAssignmentStatuses, false);
// Deserialize above bytes with KafkaEnumStateSerializer version 2 to
check backward
// compatibility
final KafkaSourceEnumState kafkaSourceEnumStateV0 =
new KafkaSourceEnumStateSerializer().deserialize(0, bytesV0);
final KafkaSourceEnumState kafkaSourceEnumStateV1 =
new KafkaSourceEnumStateSerializer().deserialize(1, bytesV1);
+ final KafkaSourceEnumState kafkaSourceEnumStateV2 =
+ new KafkaSourceEnumStateSerializer().deserialize(2, bytesV2);
-
assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions);
-
assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty();
+ assertThat(kafkaSourceEnumStateV0.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(splits);
+ assertThat(kafkaSourceEnumStateV0.unassignedSplits()).isEmpty();
assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue();
-
assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions);
-
assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty();
+ assertThat(kafkaSourceEnumStateV1.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(splits);
+ assertThat(kafkaSourceEnumStateV1.unassignedSplits()).isEmpty();
assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue();
+
+ final Map<AssignmentStatus, Set<KafkaPartitionSplit>> splitsByStatus =
+ splitAndAssignmentStatuses.stream()
+ .collect(
+ Collectors.groupingBy(
+
SplitAndAssignmentStatus::assignmentStatus,
+ Collectors.mapping(
+
SplitAndAssignmentStatus::split,
+ Collectors.toSet())));
+ assertThat(kafkaSourceEnumStateV2.assignedSplits())
+
.containsExactlyInAnyOrderElementsOf(splitsByStatus.get(AssignmentStatus.ASSIGNED));
+ assertThat(kafkaSourceEnumStateV2.unassignedSplits())
+ .containsExactlyInAnyOrderElementsOf(
+ splitsByStatus.get(AssignmentStatus.UNASSIGNED));
+
assertThat(kafkaSourceEnumStateV2.initialDiscoveryFinished()).isFalse();
+ }
+
+ private static AssignmentStatus getAssignmentStatus(KafkaPartitionSplit
split) {
+ return AssignmentStatus.values()[
+ Math.abs(split.hashCode()) % AssignmentStatus.values().length];
}
- private Set<TopicPartition> constructTopicPartitions(int startPartition) {
+ private Set<KafkaPartitionSplit> constructTopicSplits(int startPartition) {
// Create topic partitions for readers.
// Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits,
with topic name
// "topic-{i}" and
// NUM_PARTITIONS_PER_TOPIC partitions. The starting partition number
is startPartition
// Totally NUM_READERS * NUM_PARTITIONS_PER_TOPIC partitions will be
created.
- Set<TopicPartition> topicPartitions = new HashSet<>();
+ Set<KafkaPartitionSplit> topicPartitions = new HashSet<>();
for (int readerId = 0; readerId < NUM_READERS; readerId++) {
for (int partition = startPartition;
partition < startPartition + NUM_PARTITIONS_PER_TOPIC;
partition++) {
- topicPartitions.add(new TopicPartition(TOPIC_PREFIX +
readerId, partition));
+ topicPartitions.add(
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC_PREFIX + readerId,
partition),
+ STARTING_OFFSET));
Review Comment:
I had to change the logic a bit and introduced a new special value MIGRATED
against which all unit tests now go. However, I also added a test with specific
offsets to `KafkaSourceEnumeratorTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]