This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 49eaa3075ac [fix][io] Close the kafka source connector if there is 
uncaught exception (#20479)
49eaa3075ac is described below

commit 49eaa3075ac39ceaa5cfb0ac3a8b9f35d1163bd4
Author: Zike Yang <[email protected]>
AuthorDate: Tue Jun 6 17:10:14 2023 +0800

    [fix][io] Close the kafka source connector if there is uncaught exception 
(#20479)
    
    Signed-off-by: Zike Yang <[email protected]>
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  9 ++++++++-
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 23 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 661c747871d..eeaa89c601e 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -182,7 +182,14 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             }
         });
         runnerThread.setUncaughtExceptionHandler(
-                (t, e) -> LOG.error("[{}] Error while consuming records", 
t.getName(), e));
+                (t, e) -> {
+                    LOG.error("[{}] Error while consuming records", 
t.getName(), e);
+                    try {
+                        this.close();
+                    } catch (InterruptedException ex) {
+                        // The interrupted exception is thrown by the 
runnerThread itself. Ignore it.
+                    }
+                });
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
     }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index a9a5c22eb41..d706c389907 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.io.kafka.source;
 
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.lang.reflect.Field;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -28,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -153,6 +157,25 @@ public class KafkaAbstractSourceTest {
         assertEquals(config.getSslTruststorePassword(), "cert_pwd");
     }
 
+    @Test
+    public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws 
Exception {
+        KafkaAbstractSource source = new DummySource();
+        Consumer consumer = mock(Consumer.class);
+        Mockito.doThrow(new RuntimeException("Uncaught 
exception")).when(consumer)
+                .subscribe(Mockito.any(Collection.class));
+
+        Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+
+        source.start();
+
+        Field runningField = 
KafkaAbstractSource.class.getDeclaredField("running");
+        runningField.setAccessible(true);
+
+        Assert.assertFalse((boolean) runningField.get(source));
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());

Reply via email to