This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ac19fceac7 [improve][io] PIP-297: Support terminating Function & 
Connector with the fatal exception (#21143)
1ac19fceac7 is described below

commit 1ac19fceac72f535008f28880e2ae5f9a42e3334
Author: Zike Yang <[email protected]>
AuthorDate: Thu Sep 14 09:51:48 2023 +0800

    [improve][io] PIP-297: Support terminating Function & Connector with the 
fatal exception (#21143)
    
    PIP: #21079
    
    ### Motivation
    
    Currently, the connector and function cannot terminate the function 
instance if there are fatal exceptions thrown
    outside the function instance thread. The current implementation of the 
connector and Pulsar Function exception handler
    cannot handle the fatal exceptions that are thrown outside the function 
instance thread.
    
    For example, suppose we have a sink connector that uses its own threads to 
batch-sink the data to an external system. If
    any fatal exceptions occur in those threads, the function instance thread 
will not be aware of them and will
    not be able to terminate the connector. This will cause the connector to 
hang indefinitely. There is a related issue
    here: https://github.com/apache/pulsar/issues/9464
    
    The same problem exists for the source connector. The source connector may 
also use a separate thread to fetch data from
    an external system. If any fatal exceptions happen in that thread, the 
connector will also hang forever. This issue has
    been observed for the Kafka source connector: 
https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
    the notifyError method to the `PushSource` class in PIP-281: 
https://github.com/apache/pulsar/pull/20807. However, this
    does not solve the same problem that all source connectors face because not 
all connectors are implemented based on
    the `PushSource` class.
    
    The problem is same for the Pulsar Function. Currently, the function can't 
throw fatal exceptions to the function
    framework. We need to provide a way for the function developer to implement 
it.
    
    We need a way for the connector and function developers to throw fatal 
exceptions outside the function instance
    thread. The function framework should catch these exceptions and terminate 
the function accordingly.
    
    ### Modifications
    
     Introduce a new method `fatal` to the context. All the connector 
implementation code and the function code
     can use this context and call the `fatal` method to terminate the instance 
while raising a fatal exception.
    
     After the connector or function raises the fatal exception, the function 
instance thread will be interrupted.
     The function framework then could catch the exception, log it, and then 
terminate the function instance.
---
 .../apache/pulsar/functions/api/BaseContext.java   |   6 +
 .../pulsar/functions/instance/ContextImpl.java     |  12 +-
 .../functions/instance/JavaInstanceRunnable.java   |  42 ++++--
 .../pulsar/functions/instance/ContextImplTest.java |  32 +++-
 .../instance/JavaInstanceRunnableTest.java         | 166 ++++++++++++++++++++-
 .../apache/pulsar/io/common/IOConfigUtilsTest.java |  22 ++-
 .../io/kafka/sink/KafkaAbstractSinkTest.java       |  11 +-
 7 files changed, 262 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 25874c595d9..185031fa29d 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -217,4 +217,10 @@ public interface BaseContext {
         throw new UnsupportedOperationException("not implemented");
     }
 
+    /**
+     * Terminate the function instance with a fatal exception.
+     *
+     * @param t the fatal exception to be raised
+     */
+    void fatal(Throwable t);
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d03f57e9720..6664a00510e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -137,12 +137,14 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
 
     private final Function.FunctionDetails.ComponentType componentType;
 
+    private final java.util.function.Consumer<Throwable> fatalHandler;
+
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
                        SecretsProvider secretsProvider, 
FunctionCollectorRegistry collectorRegistry,
                        String[] metricsLabels,
                        Function.FunctionDetails.ComponentType componentType, 
ComponentStatsManager statsManager,
-                       StateManager stateManager, PulsarAdmin pulsarAdmin, 
ClientBuilder clientBuilder)
-            throws PulsarClientException {
+                       StateManager stateManager, PulsarAdmin pulsarAdmin, 
ClientBuilder clientBuilder,
+                       java.util.function.Consumer<Throwable> fatalHandler) {
         this.config = config;
         this.logger = logger;
         this.clientBuilder = clientBuilder;
@@ -150,6 +152,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
         this.pulsarAdmin = pulsarAdmin;
         this.topicSchema = new TopicSchema(client, 
Thread.currentThread().getContextClassLoader());
         this.statsManager = statsManager;
+        this.fatalHandler = fatalHandler;
 
         this.producerBuilder = (ProducerBuilderImpl<?>) 
client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
@@ -534,6 +537,11 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
         return clientBuilder;
     }
 
+    @Override
+    public void fatal(Throwable t) {
+        fatalHandler.accept(t);
+    }
+
     private <T> Producer<T> getProducer(String topicName, Schema<T> schema) 
throws PulsarClientException {
         Producer<T> producer;
         if (tlPublishProducers != null) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 691e547256a..b3850cbb53d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -132,7 +132,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private JavaInstance javaInstance;
     @Getter
-    private Throwable deathException;
+    private volatile Throwable deathException;
 
     // function stats
     private ComponentStatsManager stats;
@@ -282,9 +282,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     ContextImpl setupContext() throws PulsarClientException {
         Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
+        Thread currentThread = Thread.currentThread();
+        Consumer<Throwable> fatalHandler = throwable -> {
+            this.deathException = throwable;
+            currentThread.interrupt();
+        };
         return new ContextImpl(instanceConfig, instanceLog, client, 
secretsProvider,
                 collectorRegistry, metricsLabels, this.componentType, 
this.stats, stateManager,
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, fatalHandler);
     }
 
     public interface AsyncResultConsumer {
@@ -340,16 +345,35 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     // process the synchronous results
                     handleResult(currentRecord, result);
                 }
+
+                if (deathException != null) {
+                    // Ideally the current java instance thread will be 
interrupted when the deathException is set.
+                    // But if the CompletableFuture returned by the Pulsar 
Function is completed exceptionally(the
+                    // function has invoked the fatal method) before being put 
into the JavaInstance
+                    // .pendingAsyncRequests, the interrupted exception may be 
thrown when putting this future to
+                    // JavaInstance.pendingAsyncRequests. The interrupted 
exception would be caught by the JavaInstance
+                    // and be skipped.
+                    // Therefore, we need to handle this case by checking the 
deathException here and rethrow it.
+                    throw deathException;
+                }
             }
         } catch (Throwable t) {
-            log.error("[{}] Uncaught exception in Java Instance", 
FunctionCommon.getFullyQualifiedInstanceId(
-                    instanceConfig.getFunctionDetails().getTenant(),
-                    instanceConfig.getFunctionDetails().getNamespace(),
-                    instanceConfig.getFunctionDetails().getName(),
-                    instanceConfig.getInstanceId()), t);
-            deathException = t;
+            if (deathException != null) {
+                log.error("[{}] Fatal exception occurred in the instance", 
FunctionCommon.getFullyQualifiedInstanceId(
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        instanceConfig.getInstanceId()), deathException);
+            } else {
+                log.error("[{}] Uncaught exception in Java Instance", 
FunctionCommon.getFullyQualifiedInstanceId(
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        instanceConfig.getInstanceId()), t);
+                deathException = t;
+            }
             if (stats != null) {
-                stats.incrSysExceptions(t);
+                stats.incrSysExceptions(deathException);
             }
         } finally {
             log.info("Closing instance");
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 90f7df37fa1..6516b9284c9 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -120,7 +121,7 @@ public class ContextImplTest {
             client,
             new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         context.setCurrentMessageContext((Record<String>) () -> null);
     }
 
@@ -234,7 +235,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         context.getPulsarAdmin();
     }
 
@@ -248,7 +249,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         try {
             context.seek("z", 0, Mockito.mock(MessageId.class));
             Assert.fail("Expected exception");
@@ -279,7 +280,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
         
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
         context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -311,7 +312,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
         
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
         context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -335,7 +336,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder);
+                pulsarAdmin, clientBuilder, t -> {});
         ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
         
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
         ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
@@ -438,4 +439,23 @@ public class ContextImplTest {
         assertEquals(record.getProperties().get("prop-key"), "prop-value");
         assertNull(record.getValue());
     }
+
+    @Test
+    public void testFatal() {
+        Throwable fatalException = new Exception("test-fatal-exception");
+        AtomicBoolean fatalInvoked = new AtomicBoolean(false);
+        context = new ContextImpl(
+                config,
+                logger,
+                client,
+                new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
+                new String[0],
+                FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
+                pulsarAdmin, clientBuilder, t -> {
+                    assertEquals(t, fatalException);
+                    fatalInvoked.set(true);
+        });
+        context.fatal(fatalException);
+        assertTrue(fatalInvoked.get());
+    }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 0ba1d24ba74..134e77a3b58 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -24,18 +24,22 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -46,8 +50,14 @@ import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import 
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+import org.awaitility.Awaitility;
 import org.jetbrains.annotations.NotNull;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -91,6 +101,23 @@ public class JavaInstanceRunnableTest {
         return javaInstanceRunnable;
     }
 
+    private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec,
+                                                String functionClassName, 
SinkSpec sinkSpec)
+            throws PulsarClientException {
+        ClientBuilder clientBuilder = mock(ClientBuilder.class);
+        when(clientBuilder.build()).thenReturn(null);
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setSource(sourceSpec)
+                .setClassName(functionClassName)
+                .setSink(sinkSpec)
+                .build();
+        InstanceConfig config = createInstanceConfig(functionDetails);
+        config.setClusterName("test-cluster");
+        return new JavaInstanceRunnable(config, clientBuilder,
+                
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null, 
null, null, null, null,
+                Thread.currentThread().getContextClassLoader(), null);
+    }
+
     private Method makeAccessible(JavaInstanceRunnable javaInstanceRunnable) 
throws Exception {
         Method method =
                 
javaInstanceRunnable.getClass().getDeclaredMethod("setupSerDe", Class[].class, 
ClassLoader.class);
@@ -333,4 +360,137 @@ public class JavaInstanceRunnableTest {
                 .getBeanProperties(ConnectorTestConfig2.class);
         Assert.assertEquals(new TreeSet<>(beanProperties), new 
TreeSet<>(Arrays.asList("field1", "withGetter")));
     }
+
+    public static class TestSourceConnector implements Source<String> {
+
+        private LinkedBlockingQueue<Record<String>> queue;
+        private SourceContext context;
+
+        public void pushRecord(Record<String> record) throws Exception {
+            queue.put(record);
+        }
+
+        @Override
+        public void open(Map config, SourceContext sourceContext) throws 
Exception {
+            context = sourceContext;
+            queue = new LinkedBlockingQueue<>();
+        }
+
+        @Override
+        public Record<String> read() throws Exception {
+            return queue.take();
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+
+        public void fatalConnector() {
+            context.fatal(new 
Exception(FailComponentType.FAIL_SOURCE.toString()));
+        }
+    }
+
+    public static class TestFunction implements Function<String, 
CompletableFuture<String>> {
+        @Override
+        public CompletableFuture<String> process(String input, Context 
context) throws Exception {
+            CompletableFuture<String> future = new CompletableFuture<>();
+            new Thread(() -> {
+                if (FailComponentType.FAIL_FUNC.toString().equals(input)) {
+                    context.fatal(new 
Exception(FailComponentType.FAIL_FUNC.toString()));
+                } else {
+                    future.complete(input);
+                }
+            }).start();
+            return future;
+        }
+    }
+
+    public static class TestSinkConnector implements Sink<String> {
+        SinkContext context;
+
+        @Override
+        public void open(Map config, SinkContext sinkContext) throws Exception 
{
+            this.context = sinkContext;
+        }
+
+        @Override
+        public void write(Record<String> record) throws Exception {
+            new Thread(() -> {
+                if 
(FailComponentType.FAIL_SINK.toString().equals(record.getValue())) {
+                    context.fatal(new 
Exception(FailComponentType.FAIL_SINK.toString()));
+                }
+            }).start();
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+
+    private Object getPrivateField(JavaInstanceRunnable javaInstanceRunnable, 
String fieldName)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field field = JavaInstanceRunnable.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(javaInstanceRunnable);
+    }
+
+    public enum FailComponentType {
+        FAIL_SOURCE,
+        FAIL_FUNC,
+        FAIL_SINK
+    }
+
+    @DataProvider(name = "failComponentType")
+    public Object[][] failType() {
+        return new Object[][]{{FailComponentType.FAIL_SOURCE}, 
{FailComponentType.FAIL_FUNC},
+                {FailComponentType.FAIL_SINK}};
+    }
+
+    @Test(dataProvider = "failComponentType")
+    public void testFatalTheInstance(FailComponentType failComponentType) 
throws Exception {
+        JavaInstanceRunnable javaInstanceRunnable = createRunnable(
+                SourceSpec.newBuilder()
+                        
.setClassName(TestSourceConnector.class.getName()).build(),
+                TestFunction.class.getName(),
+                
SinkSpec.newBuilder().setClassName(TestSinkConnector.class.getName()).build()
+        );
+
+        Thread fnThread = new Thread(javaInstanceRunnable);
+        fnThread.start();
+
+        // Wait for the setup to complete
+        AtomicReference<TestSourceConnector> source = new AtomicReference<>();
+        Awaitility.await()
+                .pollInterval(Duration.ofMillis(200))
+                .atMost(Duration.ofSeconds(10))
+                .ignoreExceptions().untilAsserted(() -> {
+                    TestSourceConnector sourceConnector = 
(TestSourceConnector) getPrivateField(javaInstanceRunnable,
+                            "source");
+                    Assert.assertNotNull(sourceConnector);
+                    source.set(sourceConnector);
+                });
+
+        // Fail the connector or function
+        if (failComponentType == FailComponentType.FAIL_SOURCE) {
+            source.get().fatalConnector();
+        } else {
+            source.get().pushRecord(failComponentType::toString);
+        }
+
+        // Assert that the instance is terminated with the fatal exception
+        Awaitility.await()
+                .pollInterval(Duration.ofMillis(200))
+                .atMost(Duration.ofSeconds(10))
+                .ignoreExceptions().untilAsserted(() -> {
+                    
Assert.assertNotNull(javaInstanceRunnable.getDeathException());
+                    
Assert.assertEquals(javaInstanceRunnable.getDeathException().getMessage(),
+                            failComponentType.toString());
+
+                    // Assert the java instance is closed
+                    Assert.assertFalse(fnThread.isAlive());
+                    Assert.assertFalse((boolean) 
getPrivateField(javaInstanceRunnable, "isInitialized"));
+                });
+    }
 }
diff --git 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 52afac1a5ac..fd291a8a3c9 100644
--- 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -259,12 +259,12 @@ public class IOConfigUtilsTest {
         public CompletableFuture<ByteBuffer> getStateAsync(String key) {
             return null;
         }
-        
+
         @Override
         public void deleteState(String key) {
-               
+
         }
-        
+
         @Override
         public CompletableFuture<Void> deleteStateAsync(String key) {
                return null;
@@ -284,6 +284,11 @@ public class IOConfigUtilsTest {
         public PulsarClient getPulsarClient() {
             return null;
         }
+
+        @Override
+        public void fatal(Throwable t) {
+
+        }
     }
 
     @Test
@@ -449,12 +454,12 @@ public class IOConfigUtilsTest {
         public CompletableFuture<ByteBuffer> getStateAsync(String key) {
             return null;
         }
-        
+
         @Override
         public void deleteState(String key) {
-               
+
         }
-        
+
         @Override
         public CompletableFuture<Void> deleteStateAsync(String key) {
                return null;
@@ -464,6 +469,11 @@ public class IOConfigUtilsTest {
         public PulsarClient getPulsarClient() {
             return null;
         }
+
+        @Override
+        public void fatal(Throwable t) {
+
+        }
     }
 
     @Test
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index d59cdb1d9b6..9537b6576b4 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -164,12 +164,12 @@ public class KafkaAbstractSinkTest {
             public CompletableFuture<ByteBuffer> getStateAsync(String key) {
                 return null;
             }
-            
+
             @Override
             public void deleteState(String key) {
-               
+
             }
-            
+
             @Override
             public CompletableFuture<Void> deleteStateAsync(String key) {
                return null;
@@ -179,6 +179,11 @@ public class KafkaAbstractSinkTest {
             public PulsarClient getPulsarClient() {
                 return null;
             }
+
+            @Override
+            public void fatal(Throwable t) {
+
+            }
         };
         ThrowingRunnable openAndClose = ()->{
             try {

Reply via email to