fapaul commented on code in PR #192:
URL: 
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2397330827


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -132,22 +197,43 @@ private static KafkaSourceEnumState 
deserializeAssignedTopicPartitions(
         }
     }
 
-    private static KafkaSourceEnumState 
deserializeTopicPartitionAndAssignmentStatus(
-            byte[] serialized) throws IOException {
+    @VisibleForTesting
+    static byte[] serializeV2(
+            Collection<SplitAndAssignmentStatus> splits, boolean 
initialDiscoveryFinished)
+            throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(splits.size());
+            for (SplitAndAssignmentStatus splitAndAssignmentStatus : splits) {
+                final TopicPartition topicPartition =
+                        splitAndAssignmentStatus.split().getTopicPartition();
+                out.writeUTF(topicPartition.topic());
+                out.writeInt(topicPartition.partition());
+                
out.writeInt(splitAndAssignmentStatus.assignmentStatus().getStatusCode());
+            }
+            out.writeBoolean(initialDiscoveryFinished);
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    private static KafkaSourceEnumState deserializeVersion2(byte[] serialized) 
throws IOException {
 
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 DataInputStream in = new DataInputStream(bais)) {
 
             final int numPartitions = in.readInt();
-            Set<TopicPartitionAndAssignmentStatus> partitions = new 
HashSet<>(numPartitions);
+            Set<SplitAndAssignmentStatus> partitions = new 
HashSet<>(numPartitions);
 
             for (int i = 0; i < numPartitions; i++) {
                 final String topic = in.readUTF();
                 final int partition = in.readInt();
                 final int statusCode = in.readInt();
                 partitions.add(
-                        new TopicPartitionAndAssignmentStatus(
-                                new TopicPartition(topic, partition),
+                        new SplitAndAssignmentStatus(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(topic, partition),
+                                        DEFAULT_STARTING_OFFSET),

Review Comment:
   Isn't this a behavioral change? Previously the unassigned split would get 
the starting offset configured by the user on reassignment.



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java:
##########
@@ -662,63 +688,84 @@ private void verifyLastReadersAssignments(
             Collection<Integer> readers,
             Set<String> topics,
             int expectedAssignmentSeqSize) {
+        verifyLastReadersAssignments(
+                context, readers, topics, expectedAssignmentSeqSize, 
OffsetsInitializer.earliest());
+    }
+
+    private void verifyLastReadersAssignments(
+            MockSplitEnumeratorContext<KafkaPartitionSplit> context,
+            Collection<Integer> readers,
+            Set<String> topics,
+            int expectedAssignmentSeqSize,
+            OffsetsInitializer offsetsInitializer) {
         verifyAssignments(
-                getExpectedAssignments(new HashSet<>(readers), topics),
+                getExpectedAssignments(new HashSet<>(readers), topics, 
offsetsInitializer),
                 context.getSplitsAssignmentSequence()
                         .get(expectedAssignmentSeqSize - 1)
                         .assignment());
     }
 
     private void verifyAssignments(
-            Map<Integer, Set<TopicPartition>> expectedAssignments,
+            Map<Integer, Collection<KafkaPartitionSplit>> expectedAssignments,
             Map<Integer, List<KafkaPartitionSplit>> actualAssignments) {
-        actualAssignments.forEach(
-                (reader, splits) -> {
-                    Set<TopicPartition> expectedAssignmentsForReader =
-                            expectedAssignments.get(reader);
-                    assertThat(expectedAssignmentsForReader).isNotNull();
-                    
assertThat(splits.size()).isEqualTo(expectedAssignmentsForReader.size());
-                    for (KafkaPartitionSplit split : splits) {
-                        assertThat(expectedAssignmentsForReader)
-                                .contains(split.getTopicPartition());
+        
assertThat(actualAssignments).containsOnlyKeys(expectedAssignments.keySet());
+        SoftAssertions.assertSoftly(

Review Comment:
   I am not a big fan of soft assertions but this pattern with the lambda I 
think is "okay". It's just a dangerous precedent when folks start using the the 
version that requires explicitly calling `assertAll()` and we silently  disable 
all exceptions.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -96,7 +96,7 @@ public class KafkaSourceEnumerator
     // initializing partition discovery has finished.
     private boolean noMoreNewPartitionSplits = false;
     // this flag will be marked as true if initial partitions are discovered 
after enumerator starts
-    private boolean initialDiscoveryFinished;
+    private volatile boolean initialDiscoveryFinished;

Review Comment:
   Can you explain why volatile is needed here? Afaik we don't access it from a 
different thread than before.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -229,38 +239,16 @@ public void close() {
      *
      * @return Set of subscribed {@link TopicPartition}s
      */
-    private Set<TopicPartition> getSubscribedTopicPartitions() {
-        return subscriber.getSubscribedTopicPartitions(adminClient);
-    }
-
-    /**
-     * Check if there's any partition changes within subscribed topic 
partitions fetched by worker
-     * thread, and invoke {@link 
KafkaSourceEnumerator#initializePartitionSplits(PartitionChange)}
-     * in worker thread to initialize splits for new partitions.
-     *
-     * <p>NOTE: This method should only be invoked in the coordinator executor 
thread.
-     *
-     * @param fetchedPartitions Map from topic name to its description
-     * @param t Exception in worker thread
-     */
-    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, 
Throwable t) {

Review Comment:
   Nit: It would have been good to have a small refactoring commit to simplify 
the method calls it makes the reviewer easier :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to