This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4f1b53c [state][standalone] When running with more than one function instance, Function instances will crash because of table service throws INTERNAL_SERVER_ERROR (#3017) 4f1b53c is described below commit 4f1b53c88638589a66c52c6320fd1ed6dddd77be Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Nov 20 08:52:33 2018 -0800 [state][standalone] When running with more than one function instance, Function instances will crash because of table service throws INTERNAL_SERVER_ERROR (#3017) *Motivation* When running with more than one function instance, Function instances will crash because of table service throws INTERNAL_SERVER_ERROR ``` org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException: fail to access its root range : code = INTERNAL_SERVER_ERROR at org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createRootRangeException(ProtocolInternalUtils.java:105) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.RootRangeClientImpl.processCreateNamespaceResponse(RootRangeClientImpl.java:129) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.RootRangeClientImpl.lambda$createNamespace$4(RootRangeClientImpl.java:118) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.RpcUtils$1.onSuccess(RpcUtils.java:78) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.Futures$4.run(Futures.java:1135) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:902) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:813) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:655) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:487) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:465) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:391) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:471) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:553) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:474) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:591) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[java-instance.jar:2.3.0-SNAPSHOT] at org.apache.pulsar.functions.runtime.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[java-instance.jar:2.3.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_144] ``` in current version of bk, `INTERNAL_SEVER_ERROR` was thrown a client attempts to create a table while the table already exists. In a newer version, the client handles that better. *Changes* A temp fix before upgrading BK to 4.9 is to handle creating state tables in a more reliable way. --- .../functions/instance/JavaInstanceRunnable.java | 56 ++++++++++++++-------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 8ede551..3a54455 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Summary; +import javax.swing.tree.ExpandVetoException; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -290,6 +291,41 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Thread.currentThread().setContextClassLoader(fnClassLoader); } + private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception { + try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder() + .withSettings(settings) + .buildAdmin()) { + StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setInitialNumRanges(4) + .setMinNumRanges(4) + .setStorageType(StorageType.TABLE) + .build(); + while (true) { + try { + result(storageAdminClient.getStream(tableNs, tableName)); + return; + } catch (NamespaceNotFoundException nnfe) { + try { + result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() + .setDefaultStreamConf(streamConf) + .build())); + result(storageAdminClient.createStream(tableNs, tableName, streamConf)); + } catch (Exception e) { + // there might be two clients conflicting at creating table, so let's retrieve the table again + // to make sure the table is created. + } + } catch (StreamNotFoundException snfe) { + try { + result(storageAdminClient.createStream(tableNs, tableName, streamConf)); + } catch (Exception e) { + // there might be two client conflicting at creating table, so let's retrieve it to make + // sure the table is created. + } + } + } + } + } + private void setupStateTable() throws Exception { if (null == stateStorageServiceUrl) { return; @@ -307,25 +343,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { .build(); // we defer creation of the state table until a java instance is running here. - try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin()) { - StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) - .setInitialNumRanges(4) - .setMinNumRanges(4) - .setStorageType(StorageType.TABLE) - .build(); - try { - result(storageAdminClient.getStream(tableNs, tableName)); - } catch (NamespaceNotFoundException nnfe) { - result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(streamConf) - .build())); - result(storageAdminClient.createStream(tableNs, tableName, streamConf)); - } catch (StreamNotFoundException snfe) { - result(storageAdminClient.createStream(tableNs, tableName, streamConf)); - } - } + createStateTable(tableNs, tableName, settings); log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName()); this.storageClient = StorageClientBuilder.newBuilder()