This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 49eaa3075ac [fix][io] Close the kafka source connector if there is
uncaught exception (#20479)
49eaa3075ac is described below
commit 49eaa3075ac39ceaa5cfb0ac3a8b9f35d1163bd4
Author: Zike Yang <[email protected]>
AuthorDate: Tue Jun 6 17:10:14 2023 +0800
[fix][io] Close the kafka source connector if there is uncaught exception
(#20479)
Signed-off-by: Zike Yang <[email protected]>
---
.../pulsar/io/kafka/KafkaAbstractSource.java | 9 ++++++++-
.../io/kafka/source/KafkaAbstractSourceTest.java | 23 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 661c747871d..eeaa89c601e 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -182,7 +182,14 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
}
});
runnerThread.setUncaughtExceptionHandler(
- (t, e) -> LOG.error("[{}] Error while consuming records",
t.getName(), e));
+ (t, e) -> {
+ LOG.error("[{}] Error while consuming records",
t.getName(), e);
+ try {
+ this.close();
+ } catch (InterruptedException ex) {
+ // The interrupted exception is thrown by the
runnerThread itself. Ignore it.
+ }
+ });
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index a9a5c22eb41..d706c389907 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.io.kafka.source;
import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.lang.reflect.Field;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -28,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -153,6 +157,25 @@ public class KafkaAbstractSourceTest {
assertEquals(config.getSslTruststorePassword(), "cert_pwd");
}
+ @Test
+ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws
Exception {
+ KafkaAbstractSource source = new DummySource();
+ Consumer consumer = mock(Consumer.class);
+ Mockito.doThrow(new RuntimeException("Uncaught
exception")).when(consumer)
+ .subscribe(Mockito.any(Collection.class));
+
+ Field consumerField =
KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+
+ source.start();
+
+ Field runningField =
KafkaAbstractSource.class.getDeclaredField("running");
+ runningField.setAccessible(true);
+
+ Assert.assertFalse((boolean) runningField.get(source));
+ }
+
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());