sijie closed pull request #3017: [state][standalone] Fix bugs on running 
function with multiple instances in standalone
URL: https://github.com/apache/pulsar/pull/3017
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8ede5515fe..3a54455595 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,7 @@
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
+import javax.swing.tree.ExpandVetoException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -290,6 +291,41 @@ private void loadJars() throws Exception {
         Thread.currentThread().setContextClassLoader(fnClassLoader);
     }
 
+    private void createStateTable(String tableNs, String tableName, 
StorageClientSettings settings) throws Exception {
+        try (StorageAdminClient storageAdminClient = 
StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin()) {
+            StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setInitialNumRanges(4)
+                .setMinNumRanges(4)
+                .setStorageType(StorageType.TABLE)
+                .build();
+            while (true) {
+                try {
+                    result(storageAdminClient.getStream(tableNs, tableName));
+                    return;
+                } catch (NamespaceNotFoundException nnfe) {
+                    try {
+                        result(storageAdminClient.createNamespace(tableNs, 
NamespaceConfiguration.newBuilder()
+                            .setDefaultStreamConf(streamConf)
+                            .build()));
+                        result(storageAdminClient.createStream(tableNs, 
tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two clients conflicting at creating 
table, so let's retrieve the table again
+                        // to make sure the table is created.
+                    }
+                } catch (StreamNotFoundException snfe) {
+                    try {
+                        result(storageAdminClient.createStream(tableNs, 
tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two client conflicting at creating 
table, so let's retrieve it to make
+                        // sure the table is created.
+                    }
+                }
+            }
+        }
+    }
+
     private void setupStateTable() throws Exception {
         if (null == stateStorageServiceUrl) {
             return;
@@ -307,25 +343,7 @@ private void setupStateTable() throws Exception {
                 .build();
 
         // we defer creation of the state table until a java instance is 
running here.
-        try (StorageAdminClient storageAdminClient = 
StorageClientBuilder.newBuilder()
-                .withSettings(settings)
-                .buildAdmin()) {
-            StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
-                .setInitialNumRanges(4)
-                .setMinNumRanges(4)
-                .setStorageType(StorageType.TABLE)
-                .build();
-            try {
-                result(storageAdminClient.getStream(tableNs, tableName));
-            } catch (NamespaceNotFoundException nnfe) {
-                result(storageAdminClient.createNamespace(tableNs, 
NamespaceConfiguration.newBuilder()
-                        .setDefaultStreamConf(streamConf)
-                        .build()));
-                result(storageAdminClient.createStream(tableNs, tableName, 
streamConf));
-            } catch (StreamNotFoundException snfe) {
-                result(storageAdminClient.createStream(tableNs, tableName, 
streamConf));
-            }
-        }
+        createStateTable(tableNs, tableName, settings);
 
         log.info("Starting state table for function {}", 
instanceConfig.getFunctionDetails().getName());
         this.storageClient = StorageClientBuilder.newBuilder()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to