loserwang1024 opened a new pull request, #28:
URL: https://github.com/apache/flink-connector-kafka/pull/28

   ### What is the purpose of the change
   
   As described in 
[[FLIP-288](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source),
 the strategy used for new partitions is the same as the initial offset 
strategy, which is not reasonable.
   
   According to the semantics, if the startup strategy is latest, the consumed 
data should include all data from the moment of startup, which also includes 
all messages from new created partitions. However, the latest strategy 
currently maybe used for new partitions, leading to the loss of some data 
(thinking a new partition is created and might be discovered by Kafka source 
several minutes later, and the message produced into the partition within the 
gap might be dropped if we use for example "latest" as the initial offset 
strategy).if the data from all new partitions is not read, it does not meet the 
user's expectations.
   
   Other ploblems see final Section: `User specifies OffsetsInitializer for new 
partition` .
   
   Therefore,  it’s better to provide an **EARLIEST** strategy for later 
discovered partitions.
   
   ### Brief change log
   
   1. Expand `KafkaSourceEnumState` with `TopicPartitionWithAssignStatus` to 
distinguish between initial partitions and newly discovered partitions. 
`TopicPartitionWithAssignStatus` is also better for future expansion, as new 
statuses can be added without changing the state results.
   2. Add a `newDiscoveryOffsetsInitializer`(EARLIEST) to get offsets for newly 
discovered partitions. 
   3. Modify `kafkaSourceEnumStateSerializer` to handle the expanded 
`KafkaSourceEnumState`.
   
   ### Verifying this change
   
   1. Test the backward compatibility of state when deserializing in 
`KafkaSourceEnumStateSerializerTest`.
   2. Expand `KafkaEnumeratorTest#testSnapshotState` method to test snapshot 
state in more scenarios:
       1. Before first discovery, so the state should be empty
       2. First partition discovery after start, but no assignments to readers
       3. Assign partials partitions to readers
       4. Assign all partitions to readers
   3. Expand `KafkaEnumeratorTest#testDiscoverPartitionsPeriodically` method to 
test whether new partitions use EARLIEST offset while initial partitions use 
specified offset strategy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to