This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ebfbf5b Improve function state workflow with timeouts (#8528) ebfbf5b is described below commit ebfbf5bf8a77f14c1192132889b01ba14ba2fe76 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Nov 12 09:18:06 2020 -0800 Improve function state workflow with timeouts (#8528) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../functions/instance/JavaInstanceRunnable.java | 65 ++++++++++++---------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 6b27868..fd4697c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -19,20 +19,20 @@ package org.apache.pulsar.functions.instance; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; + import com.google.common.base.Stopwatch; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; +import org.apache.bookkeeper.clients.SimpleStorageClientImpl; import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl; import org.apache.bookkeeper.clients.admin.StorageAdminClient; @@ -57,9 +57,9 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.CryptoConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; @@ -73,7 +73,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; -import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.source.batch.BatchSourceExecutor; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.functions.utils.FunctionCommon; @@ -84,14 +83,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; +import java.util.concurrent.TimeoutException; /** * A function container implemented using java thread. @@ -250,7 +249,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { public void run() { try { setup(); - + while (true) { currentRecord = readInput(); @@ -335,29 +334,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return fnClassLoader; } - private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception { - try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl( - StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(), - ClientResources.create().scheduler())){ + private void createStateTableIfNotExist(String tableNs, String tableName, StorageClientSettings settings) throws Exception { + try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl( + settings, + ClientResources.create().scheduler())) { StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) - .setInitialNumRanges(4) - .setMinNumRanges(4) - .setStorageType(StorageType.TABLE) - .build(); + .setInitialNumRanges(4) + .setMinNumRanges(4) + .setStorageType(StorageType.TABLE) + .build(); Stopwatch elapsedWatch = Stopwatch.createStarted(); - while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) { + Exception lastException = null; + while (true) { try { - result(storageAdminClient.getStream(tableNs, tableName)); + result(storageAdminClient.getStream(tableNs, tableName), 30, TimeUnit.SECONDS); return; + } catch (TimeoutException e){ + lastException = e; } catch (NamespaceNotFoundException nnfe) { try { result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(streamConf) - .build())); + .setDefaultStreamConf(streamConf) + .build())); result(storageAdminClient.createStream(tableNs, tableName, streamConf)); } catch (Exception e) { // there might be two clients conflicting at creating table, so let's retrieve the table again // to make sure the table is created. + lastException = e; } } catch (StreamNotFoundException snfe) { try { @@ -365,12 +368,20 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } catch (Exception e) { // there might be two client conflicting at creating table, so let's retrieve it to make // sure the table is created. + lastException = e; } } catch (ClientException ce) { + lastException = ce; log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds", - ce.getMessage()); + ce.getMessage()); TimeUnit.MILLISECONDS.sleep(100); } + if (elapsedWatch.elapsed(TimeUnit.MINUTES) > 1) { + if (lastException != null) { + throw new RuntimeException("Failed to get or create state table within timeout", lastException); + } + throw new RuntimeException("Failed to get or create state table within timeout"); + } } } } @@ -399,13 +410,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { .build(); // we defer creation of the state table until a java instance is running here. - createStateTable(tableNs, tableName, settings); + createStateTableIfNotExist(tableNs, tableName, settings); log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName()); - this.storageClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .withNamespace(tableNs) - .build(); + this.storageClient = new SimpleStorageClientImpl(tableNs, settings); + // NOTE: this is a workaround until we bump bk version to 4.9.0 // table might just be created above, so it might not be ready for serving traffic Stopwatch openSw = Stopwatch.createStarted();