This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ac19fceac7 [improve][io] PIP-297: Support terminating Function &
Connector with the fatal exception (#21143)
1ac19fceac7 is described below
commit 1ac19fceac72f535008f28880e2ae5f9a42e3334
Author: Zike Yang <[email protected]>
AuthorDate: Thu Sep 14 09:51:48 2023 +0800
[improve][io] PIP-297: Support terminating Function & Connector with the
fatal exception (#21143)
PIP: #21079
### Motivation
Currently, the connector and function cannot terminate the function
instance if there are fatal exceptions thrown
outside the function instance thread. The current implementation of the
connector and Pulsar Function exception handler
cannot handle the fatal exceptions that are thrown outside the function
instance thread.
For example, suppose we have a sink connector that uses its own threads to
batch-sink the data to an external system. If
any fatal exceptions occur in those threads, the function instance thread
will not be aware of them and will
not be able to terminate the connector. This will cause the connector to
hang indefinitely. There is a related issue
here: https://github.com/apache/pulsar/issues/9464
The same problem exists for the source connector. The source connector may
also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the
connector will also hang forever. This issue has
been observed for the Kafka source connector:
https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
the notifyError method to the `PushSource` class in PIP-281:
https://github.com/apache/pulsar/pull/20807. However, this
does not solve the same problem that all source connectors face because not
all connectors are implemented based on
the `PushSource` class.
The problem is same for the Pulsar Function. Currently, the function can't
throw fatal exceptions to the function
framework. We need to provide a way for the function developer to implement
it.
We need a way for the connector and function developers to throw fatal
exceptions outside the function instance
thread. The function framework should catch these exceptions and terminate
the function accordingly.
### Modifications
Introduce a new method `fatal` to the context. All the connector
implementation code and the function code
can use this context and call the `fatal` method to terminate the instance
while raising a fatal exception.
After the connector or function raises the fatal exception, the function
instance thread will be interrupted.
The function framework then could catch the exception, log it, and then
terminate the function instance.
---
.../apache/pulsar/functions/api/BaseContext.java | 6 +
.../pulsar/functions/instance/ContextImpl.java | 12 +-
.../functions/instance/JavaInstanceRunnable.java | 42 ++++--
.../pulsar/functions/instance/ContextImplTest.java | 32 +++-
.../instance/JavaInstanceRunnableTest.java | 166 ++++++++++++++++++++-
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 22 ++-
.../io/kafka/sink/KafkaAbstractSinkTest.java | 11 +-
7 files changed, 262 insertions(+), 29 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 25874c595d9..185031fa29d 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -217,4 +217,10 @@ public interface BaseContext {
throw new UnsupportedOperationException("not implemented");
}
+ /**
+ * Terminate the function instance with a fatal exception.
+ *
+ * @param t the fatal exception to be raised
+ */
+ void fatal(Throwable t);
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d03f57e9720..6664a00510e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -137,12 +137,14 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
private final Function.FunctionDetails.ComponentType componentType;
+ private final java.util.function.Consumer<Throwable> fatalHandler;
+
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType,
ComponentStatsManager statsManager,
- StateManager stateManager, PulsarAdmin pulsarAdmin,
ClientBuilder clientBuilder)
- throws PulsarClientException {
+ StateManager stateManager, PulsarAdmin pulsarAdmin,
ClientBuilder clientBuilder,
+ java.util.function.Consumer<Throwable> fatalHandler) {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
@@ -150,6 +152,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client,
Thread.currentThread().getContextClassLoader());
this.statsManager = statsManager;
+ this.fatalHandler = fatalHandler;
this.producerBuilder = (ProducerBuilderImpl<?>)
client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
@@ -534,6 +537,11 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
return clientBuilder;
}
+ @Override
+ public void fatal(Throwable t) {
+ fatalHandler.accept(t);
+ }
+
private <T> Producer<T> getProducer(String topicName, Schema<T> schema)
throws PulsarClientException {
Producer<T> producer;
if (tlPublishProducers != null) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 691e547256a..b3850cbb53d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -132,7 +132,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private JavaInstance javaInstance;
@Getter
- private Throwable deathException;
+ private volatile Throwable deathException;
// function stats
private ComponentStatsManager stats;
@@ -282,9 +282,14 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
+ Thread currentThread = Thread.currentThread();
+ Consumer<Throwable> fatalHandler = throwable -> {
+ this.deathException = throwable;
+ currentThread.interrupt();
+ };
return new ContextImpl(instanceConfig, instanceLog, client,
secretsProvider,
collectorRegistry, metricsLabels, this.componentType,
this.stats, stateManager,
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, fatalHandler);
}
public interface AsyncResultConsumer {
@@ -340,16 +345,35 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
// process the synchronous results
handleResult(currentRecord, result);
}
+
+ if (deathException != null) {
+ // Ideally the current java instance thread will be
interrupted when the deathException is set.
+ // But if the CompletableFuture returned by the Pulsar
Function is completed exceptionally(the
+ // function has invoked the fatal method) before being put
into the JavaInstance
+ // .pendingAsyncRequests, the interrupted exception may be
thrown when putting this future to
+ // JavaInstance.pendingAsyncRequests. The interrupted
exception would be caught by the JavaInstance
+ // and be skipped.
+ // Therefore, we need to handle this case by checking the
deathException here and rethrow it.
+ throw deathException;
+ }
}
} catch (Throwable t) {
- log.error("[{}] Uncaught exception in Java Instance",
FunctionCommon.getFullyQualifiedInstanceId(
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName(),
- instanceConfig.getInstanceId()), t);
- deathException = t;
+ if (deathException != null) {
+ log.error("[{}] Fatal exception occurred in the instance",
FunctionCommon.getFullyQualifiedInstanceId(
+ instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName(),
+ instanceConfig.getInstanceId()), deathException);
+ } else {
+ log.error("[{}] Uncaught exception in Java Instance",
FunctionCommon.getFullyQualifiedInstanceId(
+ instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName(),
+ instanceConfig.getInstanceId()), t);
+ deathException = t;
+ }
if (stats != null) {
- stats.incrSysExceptions(t);
+ stats.incrSysExceptions(deathException);
}
} finally {
log.info("Closing instance");
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 90f7df37fa1..6516b9284c9 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -120,7 +121,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
context.setCurrentMessageContext((Record<String>) () -> null);
}
@@ -234,7 +235,7 @@ public class ContextImplTest {
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
context.getPulsarAdmin();
}
@@ -248,7 +249,7 @@ public class ContextImplTest {
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
@@ -279,7 +280,7 @@ public class ContextImplTest {
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -311,7 +312,7 @@ public class ContextImplTest {
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -335,7 +336,7 @@ public class ContextImplTest {
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin, clientBuilder);
+ pulsarAdmin, clientBuilder, t -> {});
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
@@ -438,4 +439,23 @@ public class ContextImplTest {
assertEquals(record.getProperties().get("prop-key"), "prop-value");
assertNull(record.getValue());
}
+
+ @Test
+ public void testFatal() {
+ Throwable fatalException = new Exception("test-fatal-exception");
+ AtomicBoolean fatalInvoked = new AtomicBoolean(false);
+ context = new ContextImpl(
+ config,
+ logger,
+ client,
+ new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(),
+ new String[0],
+ FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
+ pulsarAdmin, clientBuilder, t -> {
+ assertEquals(t, fatalException);
+ fatalInvoked.set(true);
+ });
+ context.fatal(fatalException);
+ assertTrue(fatalInvoked.get());
+ }
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 0ba1d24ba74..134e77a3b58 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -24,18 +24,22 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -46,8 +50,14 @@ import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -91,6 +101,23 @@ public class JavaInstanceRunnableTest {
return javaInstanceRunnable;
}
+ private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec,
+ String functionClassName,
SinkSpec sinkSpec)
+ throws PulsarClientException {
+ ClientBuilder clientBuilder = mock(ClientBuilder.class);
+ when(clientBuilder.build()).thenReturn(null);
+ FunctionDetails functionDetails = FunctionDetails.newBuilder()
+ .setSource(sourceSpec)
+ .setClassName(functionClassName)
+ .setSink(sinkSpec)
+ .build();
+ InstanceConfig config = createInstanceConfig(functionDetails);
+ config.setClusterName("test-cluster");
+ return new JavaInstanceRunnable(config, clientBuilder,
+
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null,
null, null, null, null,
+ Thread.currentThread().getContextClassLoader(), null);
+ }
+
private Method makeAccessible(JavaInstanceRunnable javaInstanceRunnable)
throws Exception {
Method method =
javaInstanceRunnable.getClass().getDeclaredMethod("setupSerDe", Class[].class,
ClassLoader.class);
@@ -333,4 +360,137 @@ public class JavaInstanceRunnableTest {
.getBeanProperties(ConnectorTestConfig2.class);
Assert.assertEquals(new TreeSet<>(beanProperties), new
TreeSet<>(Arrays.asList("field1", "withGetter")));
}
+
+ public static class TestSourceConnector implements Source<String> {
+
+ private LinkedBlockingQueue<Record<String>> queue;
+ private SourceContext context;
+
+ public void pushRecord(Record<String> record) throws Exception {
+ queue.put(record);
+ }
+
+ @Override
+ public void open(Map config, SourceContext sourceContext) throws
Exception {
+ context = sourceContext;
+ queue = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public Record<String> read() throws Exception {
+ return queue.take();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ public void fatalConnector() {
+ context.fatal(new
Exception(FailComponentType.FAIL_SOURCE.toString()));
+ }
+ }
+
+ public static class TestFunction implements Function<String,
CompletableFuture<String>> {
+ @Override
+ public CompletableFuture<String> process(String input, Context
context) throws Exception {
+ CompletableFuture<String> future = new CompletableFuture<>();
+ new Thread(() -> {
+ if (FailComponentType.FAIL_FUNC.toString().equals(input)) {
+ context.fatal(new
Exception(FailComponentType.FAIL_FUNC.toString()));
+ } else {
+ future.complete(input);
+ }
+ }).start();
+ return future;
+ }
+ }
+
+ public static class TestSinkConnector implements Sink<String> {
+ SinkContext context;
+
+ @Override
+ public void open(Map config, SinkContext sinkContext) throws Exception
{
+ this.context = sinkContext;
+ }
+
+ @Override
+ public void write(Record<String> record) throws Exception {
+ new Thread(() -> {
+ if
(FailComponentType.FAIL_SINK.toString().equals(record.getValue())) {
+ context.fatal(new
Exception(FailComponentType.FAIL_SINK.toString()));
+ }
+ }).start();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+
+ private Object getPrivateField(JavaInstanceRunnable javaInstanceRunnable,
String fieldName)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = JavaInstanceRunnable.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(javaInstanceRunnable);
+ }
+
+ public enum FailComponentType {
+ FAIL_SOURCE,
+ FAIL_FUNC,
+ FAIL_SINK
+ }
+
+ @DataProvider(name = "failComponentType")
+ public Object[][] failType() {
+ return new Object[][]{{FailComponentType.FAIL_SOURCE},
{FailComponentType.FAIL_FUNC},
+ {FailComponentType.FAIL_SINK}};
+ }
+
+ @Test(dataProvider = "failComponentType")
+ public void testFatalTheInstance(FailComponentType failComponentType)
throws Exception {
+ JavaInstanceRunnable javaInstanceRunnable = createRunnable(
+ SourceSpec.newBuilder()
+
.setClassName(TestSourceConnector.class.getName()).build(),
+ TestFunction.class.getName(),
+
SinkSpec.newBuilder().setClassName(TestSinkConnector.class.getName()).build()
+ );
+
+ Thread fnThread = new Thread(javaInstanceRunnable);
+ fnThread.start();
+
+ // Wait for the setup to complete
+ AtomicReference<TestSourceConnector> source = new AtomicReference<>();
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(200))
+ .atMost(Duration.ofSeconds(10))
+ .ignoreExceptions().untilAsserted(() -> {
+ TestSourceConnector sourceConnector =
(TestSourceConnector) getPrivateField(javaInstanceRunnable,
+ "source");
+ Assert.assertNotNull(sourceConnector);
+ source.set(sourceConnector);
+ });
+
+ // Fail the connector or function
+ if (failComponentType == FailComponentType.FAIL_SOURCE) {
+ source.get().fatalConnector();
+ } else {
+ source.get().pushRecord(failComponentType::toString);
+ }
+
+ // Assert that the instance is terminated with the fatal exception
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(200))
+ .atMost(Duration.ofSeconds(10))
+ .ignoreExceptions().untilAsserted(() -> {
+
Assert.assertNotNull(javaInstanceRunnable.getDeathException());
+
Assert.assertEquals(javaInstanceRunnable.getDeathException().getMessage(),
+ failComponentType.toString());
+
+ // Assert the java instance is closed
+ Assert.assertFalse(fnThread.isAlive());
+ Assert.assertFalse((boolean)
getPrivateField(javaInstanceRunnable, "isInitialized"));
+ });
+ }
}
diff --git
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 52afac1a5ac..fd291a8a3c9 100644
---
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -259,12 +259,12 @@ public class IOConfigUtilsTest {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
-
+
@Override
public void deleteState(String key) {
-
+
}
-
+
@Override
public CompletableFuture<Void> deleteStateAsync(String key) {
return null;
@@ -284,6 +284,11 @@ public class IOConfigUtilsTest {
public PulsarClient getPulsarClient() {
return null;
}
+
+ @Override
+ public void fatal(Throwable t) {
+
+ }
}
@Test
@@ -449,12 +454,12 @@ public class IOConfigUtilsTest {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
-
+
@Override
public void deleteState(String key) {
-
+
}
-
+
@Override
public CompletableFuture<Void> deleteStateAsync(String key) {
return null;
@@ -464,6 +469,11 @@ public class IOConfigUtilsTest {
public PulsarClient getPulsarClient() {
return null;
}
+
+ @Override
+ public void fatal(Throwable t) {
+
+ }
}
@Test
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index d59cdb1d9b6..9537b6576b4 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -164,12 +164,12 @@ public class KafkaAbstractSinkTest {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
-
+
@Override
public void deleteState(String key) {
-
+
}
-
+
@Override
public CompletableFuture<Void> deleteStateAsync(String key) {
return null;
@@ -179,6 +179,11 @@ public class KafkaAbstractSinkTest {
public PulsarClient getPulsarClient() {
return null;
}
+
+ @Override
+ public void fatal(Throwable t) {
+
+ }
};
ThrowingRunnable openAndClose = ()->{
try {