shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r266258426
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
##########
@@ -60,16 +74,22 @@
private final Config config;
private final boolean fetchThresholdBytesEnabled;
private final KafkaSystemConsumerMetrics metrics;
+ private final KafkaStartpointRegistrationHandler
kafkaStartpointRegistrationHandler;
// This sink is used to transfer the messages from the proxy/consumer to the
BlockingEnvelopeMap.
final KafkaConsumerMessageSink messageSink;
// This proxy contains a separate thread, which reads kafka messages (with
consumer.poll()) and populates
// BlockingEnvelopMap's buffers.
- final private KafkaConsumerProxy proxy;
+ @VisibleForTesting
+ KafkaConsumerProxy proxy;
// keep registration data until the start - mapping between registered SSPs
and topicPartitions, and their offsets
final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
Review comment:
1. Created PR: 954 which addresses the comment of moving the refactor into
separate PR.
2. Added the rationale in description of PR: 954 as to why this refactor
would not break any existing functionality.
* `CoordinatorStreamSystemConsumer`: Reads the coordinator stream from
oldest offset. Please check
here(https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java#L128)
for more details.
* `CoordinatorStreamStore`: Reads the coordinator stream from oldest
offset. Please check
[here](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java#L183)
for more details.
* `ContainerStorageManager`: Chooses the lowest offset before
registering with consumer. [Relevant
code](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java#L68)
.
Please let me know if there're any other questions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services