This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1b53c  [state][standalone] When running with more than one function 
instance, Function instances will crash because of table service throws 
INTERNAL_SERVER_ERROR (#3017)
4f1b53c is described below

commit 4f1b53c88638589a66c52c6320fd1ed6dddd77be
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Nov 20 08:52:33 2018 -0800

    [state][standalone] When running with more than one function instance, 
Function instances will crash because of table service throws 
INTERNAL_SERVER_ERROR (#3017)
    
    *Motivation*
    
    When running with more than one function instance, Function instances will 
crash because of table service throws INTERNAL_SERVER_ERROR
    
    ```
    
org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException:
 fail to access its root range : code = INTERNAL_SERVER_ERROR
            at 
org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createRootRangeException(ProtocolInternalUtils.java:105)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.RootRangeClientImpl.processCreateNamespaceResponse(RootRangeClientImpl.java:129)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.RootRangeClientImpl.lambda$createNamespace$4(RootRangeClientImpl.java:118)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.RpcUtils$1.onSuccess(RpcUtils.java:78)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.Futures$4.run(Futures.java:1135)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:902)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:813)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:655)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:487)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:465)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:391)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:471)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:553)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:474)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:591)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 ~[java-instance.jar:2.3.0-SNAPSHOT]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_144]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_144]
    
    ```
    
    in current version of bk, `INTERNAL_SEVER_ERROR` was thrown a client 
attempts to create a table while the table already exists. In a newer version, 
the client handles that better.
    
    *Changes*
    
    A temp fix before upgrading BK to 4.9 is to handle creating state tables in 
a more reliable way.
---
 .../functions/instance/JavaInstanceRunnable.java   | 56 ++++++++++++++--------
 1 file changed, 37 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8ede551..3a54455 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
+import javax.swing.tree.ExpandVetoException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -290,6 +291,41 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         Thread.currentThread().setContextClassLoader(fnClassLoader);
     }
 
+    private void createStateTable(String tableNs, String tableName, 
StorageClientSettings settings) throws Exception {
+        try (StorageAdminClient storageAdminClient = 
StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin()) {
+            StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setInitialNumRanges(4)
+                .setMinNumRanges(4)
+                .setStorageType(StorageType.TABLE)
+                .build();
+            while (true) {
+                try {
+                    result(storageAdminClient.getStream(tableNs, tableName));
+                    return;
+                } catch (NamespaceNotFoundException nnfe) {
+                    try {
+                        result(storageAdminClient.createNamespace(tableNs, 
NamespaceConfiguration.newBuilder()
+                            .setDefaultStreamConf(streamConf)
+                            .build()));
+                        result(storageAdminClient.createStream(tableNs, 
tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two clients conflicting at creating 
table, so let's retrieve the table again
+                        // to make sure the table is created.
+                    }
+                } catch (StreamNotFoundException snfe) {
+                    try {
+                        result(storageAdminClient.createStream(tableNs, 
tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two client conflicting at creating 
table, so let's retrieve it to make
+                        // sure the table is created.
+                    }
+                }
+            }
+        }
+    }
+
     private void setupStateTable() throws Exception {
         if (null == stateStorageServiceUrl) {
             return;
@@ -307,25 +343,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 .build();
 
         // we defer creation of the state table until a java instance is 
running here.
-        try (StorageAdminClient storageAdminClient = 
StorageClientBuilder.newBuilder()
-                .withSettings(settings)
-                .buildAdmin()) {
-            StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
-                .setInitialNumRanges(4)
-                .setMinNumRanges(4)
-                .setStorageType(StorageType.TABLE)
-                .build();
-            try {
-                result(storageAdminClient.getStream(tableNs, tableName));
-            } catch (NamespaceNotFoundException nnfe) {
-                result(storageAdminClient.createNamespace(tableNs, 
NamespaceConfiguration.newBuilder()
-                        .setDefaultStreamConf(streamConf)
-                        .build()));
-                result(storageAdminClient.createStream(tableNs, tableName, 
streamConf));
-            } catch (StreamNotFoundException snfe) {
-                result(storageAdminClient.createStream(tableNs, tableName, 
streamConf));
-            }
-        }
+        createStateTable(tableNs, tableName, settings);
 
         log.info("Starting state table for function {}", 
instanceConfig.getFunctionDetails().getName());
         this.storageClient = StorageClientBuilder.newBuilder()

Reply via email to