ZihanLi58 commented on code in PR #3570:
URL: https://github.com/apache/gobblin/pull/3570#discussion_r982929378


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -123,19 +123,7 @@ public HighLevelConsumer(String topic, Config config, int 
numThreads) {
     this.numThreads = numThreads;
     this.config = config.withFallback(FALLBACK);
     this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
-    // On Partition rebalance, commit exisiting offsets and reset.
-    this.gobblinKafkaConsumerClient.subscribe(this.topic, new 
GobblinConsumerRebalanceListener() {
-      @Override
-      public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
-        copyAndCommit();
-        partitionOffsetsToCommit.clear();
-      }
-
-      @Override
-      public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
-        // No op
-      }
-    });
+    initializeConsumerClient();

Review Comment:
   Change the method name to be assign topic partitions?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -74,7 +76,15 @@ public String load(String key) throws Exception {
   protected GobblinServiceJobScheduler scheduler;
 
   public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
-    super(topic, config, numThreads);
+    // Differentiate group id for each host
+    super(topic, config.withValue(GROUP_ID_KEY,
+        ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
+        numThreads);
+  }
+
+  @Override
+  protected void initializeConsumerClient() {

Review Comment:
   Add comment here to mention that we expect the client to consume all 
partitions in the topic from the latest offset? You can even move that part of 
logic here



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java:
##########
@@ -169,6 +169,8 @@ default long committed(KafkaPartition partition) {
 
   public default void assignAndSeek(List<KafkaPartition> topicPartitions, 
Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
 
+  public default void initializeClient(String topic) { return; }

Review Comment:
   What's this method for? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to