This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 911fbf5fa2b [fix][io] Not restart instance when kafka source poll 
exception. (#20795)
911fbf5fa2b is described below

commit 911fbf5fa2b49825d7dcbf2270f0329a5267a2fa
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 21:11:06 2023 +0800

    [fix][io] Not restart instance when kafka source poll exception. (#20795)
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       | 53 ++++++++--------------
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 43 +++++++++++++-----
 2 files changed, 52 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 012e4143744..3d4612c039f 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -133,7 +132,6 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             throw new IllegalArgumentException("Unable to instantiate Kafka 
consumer", ex);
         }
         this.start();
-        running = true;
     }
 
     protected Properties beforeCreateConsumer(Properties props) {
@@ -158,47 +156,36 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
 
     @SuppressWarnings("unchecked")
     public void start() {
+        LOG.info("Starting subscribe kafka source on {}", 
kafkaSourceConfig.getTopic());
+        
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source on {}", 
kafkaSourceConfig.getTopic());
-            
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
             while (running) {
-                ConsumerRecords<Object, Object> consumerRecords = 
consumer.poll(Duration.ofSeconds(1L));
-                CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
-                int index = 0;
-                for (ConsumerRecord<Object, Object> consumerRecord : 
consumerRecords) {
-                    KafkaRecord record = buildRecord(consumerRecord);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Write record {} {} {}", record.getKey(), 
record.getValue(), record.getSchema());
+                try {
+                    ConsumerRecords<Object, Object> consumerRecords = 
consumer.poll(Duration.ofSeconds(1L));
+                    CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
+                    int index = 0;
+                    for (ConsumerRecord<Object, Object> consumerRecord : 
consumerRecords) {
+                        KafkaRecord record = buildRecord(consumerRecord);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Write record {} {} {}", 
record.getKey(), record.getValue(), record.getSchema());
+                        }
+                        consume(record);
+                        futures[index] = record.getCompletableFuture();
+                        index++;
                     }
-                    consume(record);
-                    futures[index] = record.getCompletableFuture();
-                    index++;
-                }
-                if (!kafkaSourceConfig.isAutoCommitEnabled()) {
-                    try {
+                    if (!kafkaSourceConfig.isAutoCommitEnabled()) {
                         CompletableFuture.allOf(futures).get();
                         consumer.commitSync();
-                    } catch (InterruptedException ex) {
-                        break;
-                    } catch (ExecutionException ex) {
-                        LOG.error("Error while processing records", ex);
-                        break;
                     }
+                } catch (Exception e) {
+                    LOG.error("Error while processing records", e);
+                    notifyError(e);
+                    break;
                 }
             }
         });
-        runnerThread.setUncaughtExceptionHandler(
-                (t, e) -> {
-                    new Thread(() -> {
-                        LOG.error("[{}] Error while consuming records", 
t.getName(), e);
-                        try {
-                            this.close();
-                        } catch (Exception ex) {
-                            LOG.error("[{}] Close kafka source error", 
t.getName(), e);
-                        }
-                    }, "Kafka Source Close Task Thread").start();
-                });
+        running = true;
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
     }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 402727f4ec0..6911ec2a6bf 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.source;
 
 
 import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.lang.reflect.Field;
@@ -31,7 +32,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
-import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -158,26 +158,47 @@ public class KafkaAbstractSourceTest {
         assertEquals(config.getSslTruststorePassword(), "cert_pwd");
     }
 
-    @Test
-    public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws 
Exception {
+    @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "Subscribe exception")
+    public final void throwExceptionBySubscribe() throws Exception {
         KafkaAbstractSource source = new DummySource();
+
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        Field kafkaSourceConfigField = 
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
         Consumer consumer = mock(Consumer.class);
-        Mockito.doThrow(new RuntimeException("Uncaught 
exception")).when(consumer)
+        Mockito.doThrow(new RuntimeException("Subscribe 
exception")).when(consumer)
                 .subscribe(Mockito.any(Collection.class));
 
         Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
         consumerField.setAccessible(true);
         consumerField.set(source, consumer);
-
+        // will throw RuntimeException.
         source.start();
+    }
+
+    @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "Pool exception")
+    public final void throwExceptionByPoll() throws Exception {
+        KafkaAbstractSource source = new DummySource();
 
-        Field runningField = 
KafkaAbstractSource.class.getDeclaredField("running");
-        runningField.setAccessible(true);
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        Field kafkaSourceConfigField = 
KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
 
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertFalse((boolean) runningField.get(source));
-            Assert.assertNull(consumerField.get(source));
-        });
+        Consumer consumer = mock(Consumer.class);
+        Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
+                .poll(Mockito.any(Duration.class));
+
+        Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+        source.start();
+        // will throw RuntimeException.
+        source.read();
     }
 
     private File getFile(String name) {

Reply via email to