sijie closed pull request #3017: [state][standalone] Fix bugs on running function with multiple instances in standalone URL: https://github.com/apache/pulsar/pull/3017
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 8ede5515fe..3a54455595 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Summary; +import javax.swing.tree.ExpandVetoException; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -290,6 +291,41 @@ private void loadJars() throws Exception { Thread.currentThread().setContextClassLoader(fnClassLoader); } + private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception { + try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder() + .withSettings(settings) + .buildAdmin()) { + StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setInitialNumRanges(4) + .setMinNumRanges(4) + .setStorageType(StorageType.TABLE) + .build(); + while (true) { + try { + result(storageAdminClient.getStream(tableNs, tableName)); + return; + } catch (NamespaceNotFoundException nnfe) { + try { + result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() + .setDefaultStreamConf(streamConf) + .build())); + result(storageAdminClient.createStream(tableNs, tableName, streamConf)); + } catch (Exception e) { + // there might be two clients conflicting at creating table, so let's retrieve the table again + // to make sure the table is created. + } + } catch (StreamNotFoundException snfe) { + try { + result(storageAdminClient.createStream(tableNs, tableName, streamConf)); + } catch (Exception e) { + // there might be two client conflicting at creating table, so let's retrieve it to make + // sure the table is created. + } + } + } + } + } + private void setupStateTable() throws Exception { if (null == stateStorageServiceUrl) { return; @@ -307,25 +343,7 @@ private void setupStateTable() throws Exception { .build(); // we defer creation of the state table until a java instance is running here. - try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin()) { - StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) - .setInitialNumRanges(4) - .setMinNumRanges(4) - .setStorageType(StorageType.TABLE) - .build(); - try { - result(storageAdminClient.getStream(tableNs, tableName)); - } catch (NamespaceNotFoundException nnfe) { - result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(streamConf) - .build())); - result(storageAdminClient.createStream(tableNs, tableName, streamConf)); - } catch (StreamNotFoundException snfe) { - result(storageAdminClient.createStream(tableNs, tableName, streamConf)); - } - } + createStateTable(tableNs, tableName, settings); log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName()); this.storageClient = StorageClientBuilder.newBuilder() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services