This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 989d3ce07f MINOR: Small cleanups in connect/mirror (#12113)
989d3ce07f is described below

commit 989d3ce07f1848d4c0b9fbb116ff0cf9b3b382d7
Author: Mickael Maison <[email protected]>
AuthorDate: Tue May 10 08:49:56 2022 +0200

    MINOR: Small cleanups in connect/mirror (#12113)
    
    Reviewers: Luke Chen <[email protected]>, Divij Vaidya 
<[email protected]>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  2 +-
 .../apache/kafka/connect/mirror/OffsetSync.java    |  6 +++---
 .../kafka/connect/mirror/OffsetSyncStore.java      |  6 +++---
 .../org/apache/kafka/connect/mirror/Scheduler.java | 22 +++++++++++-----------
 .../MirrorConnectorsIntegrationBaseTest.java       |  2 +-
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 47631998fb..30fb695d92 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -105,7 +105,7 @@ public class MirrorCheckpointTask extends SourceTask {
     }
 
     @Override
-    public void commit() throws InterruptedException {
+    public void commit() {
         // nop
     }
 
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index 68e6441f18..e1ecb1e1db 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -39,9 +39,9 @@ public class OffsetSync {
             new Field(TOPIC_KEY, Type.STRING),
             new Field(PARTITION_KEY, Type.INT32));
 
-    private TopicPartition topicPartition;
-    private long upstreamOffset;
-    private long downstreamOffset;
+    private final TopicPartition topicPartition;
+    private final long upstreamOffset;
+    private final long downstreamOffset;
 
     public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long 
downstreamOffset) {
         this.topicPartition = topicPartition;
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 600dda46f3..9152cd5aa0 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -30,9 +30,9 @@ import java.time.Duration;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private KafkaConsumer<byte[], byte[]> consumer;
-    private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private TopicPartition offsetSyncTopicPartition;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new 
HashMap<>();
+    private final TopicPartition offsetSyncTopicPartition;
 
     OffsetSyncStore(MirrorConnectorConfig config) {
         consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 20f2ca7e2c..0644d6a6c6 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class Scheduler implements AutoCloseable {
-    private static Logger log = LoggerFactory.getLogger(Scheduler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
 
     private final String name;
     private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
@@ -62,11 +62,11 @@ class Scheduler implements AutoCloseable {
         try {
             executor.submit(() -> executeThread(task, 
description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted running task: {}", name, description);
+            LOG.warn("{} was interrupted running task: {}", name, description);
         } catch (TimeoutException e) {
-            log.error("{} timed out running task: {}", name, description);
+            LOG.error("{} timed out running task: {}", name, description);
         } catch (Throwable e) {
-            log.error("{} caught exception in task: {}", name, description, e);
+            LOG.error("{} caught exception in task: {}", name, description, e);
         }
     } 
 
@@ -76,10 +76,10 @@ class Scheduler implements AutoCloseable {
         try {
             boolean terminated = executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
             if (!terminated) {
-                log.error("{} timed out during shutdown of internal 
scheduler.", name);
+                LOG.error("{} timed out during shutdown of internal 
scheduler.", name);
             }
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted during shutdown of internal 
scheduler.", name);
+            LOG.warn("{} was interrupted during shutdown of internal 
scheduler.", name);
         }
     }
 
@@ -92,21 +92,21 @@ class Scheduler implements AutoCloseable {
             long start = System.currentTimeMillis();
             task.run();
             long elapsed = System.currentTimeMillis() - start;
-            log.info("{} took {} ms", description, elapsed);
+            LOG.info("{} took {} ms", description, elapsed);
             if (elapsed > timeout.toMillis()) {
-                log.warn("{} took too long ({} ms) running task: {}", name, 
elapsed, description);
+                LOG.warn("{} took too long ({} ms) running task: {}", name, 
elapsed, description);
             }
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted running task: {}", name, description);
+            LOG.warn("{} was interrupted running task: {}", name, description);
         } catch (Throwable e) {
-            log.error("{} caught exception in scheduled task: {}", name, 
description, e);
+            LOG.error("{} caught exception in scheduled task: {}", name, 
description, e);
         }
     }
 
     private void executeThread(Task task, String description) {
         Thread.currentThread().setName(name + "-" + description);
         if (closed) {
-            log.info("{} skipping task due to shutdown: {}", name, 
description);
+            LOG.info("{} skipping task due to shutdown: {}", name, 
description);
             return;
         }
         run(task, description);
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 8f692ca911..f325e15695 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -729,7 +729,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
     /*
      * Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
      */
-    protected void warmUpConsumer(Map<String, Object> consumerProps) throws 
InterruptedException {
+    protected void warmUpConsumer(Map<String, Object> consumerProps) {
         Consumer<byte[], byte[]> dummyConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
         dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         dummyConsumer.commitSync();

Reply via email to