This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5fdb94eb9eb27178c4a8bca5eb82438b6056e895 Author: Matteo Merli <[email protected]> AuthorDate: Sat Nov 6 16:51:49 2021 -0700 Allow to configure different implementations for Pulsar functions state store (#12646) (cherry picked from commit 08a49c06bff4a52d26319a114961aed6cb6c4791) --- .../worker/PulsarFunctionLocalRunTest.java | 6 +- .../worker/PulsarFunctionMetadataStoreTest.java | 122 ++++++++++++++++++ pulsar-functions/instance/pom.xml | 6 + .../functions/instance/JavaInstanceRunnable.java | 15 ++- .../instance/state/BKStateStoreProviderImpl.java | 2 - .../state/PulsarMetadataStateStoreImpl.java | 142 +++++++++++++++++++++ .../PulsarMetadataStateStoreProviderImpl.java | 67 ++++++++++ .../instance/state/StateStoreProvider.java | 2 + .../instance/JavaInstanceRunnableTest.java | 2 +- .../state/PulsarMetadataStateStoreImplTest.java | 120 +++++++++++++++++ .../org/apache/pulsar/functions/LocalRunner.java | 9 +- .../functions/runtime/JavaInstanceStarter.java | 4 + .../functions/runtime/thread/ThreadRuntime.java | 5 +- .../runtime/thread/ThreadRuntimeFactory.java | 18 ++- .../pulsar/functions/worker/WorkerConfig.java | 8 ++ .../worker/rest/api/FunctionsImplTest.java | 4 +- 16 files changed, 515 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index ed6c4aa..13ac623 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -304,7 +304,7 @@ public class PulsarFunctionLocalRunTest { } } - private WorkerConfig createWorkerConfig(ServiceConfiguration config) { + protected WorkerConfig createWorkerConfig(ServiceConfiguration config) { System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath()); @@ -560,7 +560,7 @@ public class PulsarFunctionLocalRunTest { } } - private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception { + protected void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception { testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1); } @@ -1133,7 +1133,7 @@ public class PulsarFunctionLocalRunTest { } } - private void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable { + protected void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(pulsarApiExamplesClassLoader); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java new file mode 100644 index 0000000..715837f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; +import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.common.functions.ConsumerConfig; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Utils; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.LocalRunner; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; +import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test Pulsar sink on function + */ +@Slf4j +@Test +public class PulsarFunctionMetadataStoreTest extends PulsarFunctionLocalRunTest { + + + protected WorkerConfig createWorkerConfig(ServiceConfiguration config) { + WorkerConfig wc = super.createWorkerConfig(config); + wc.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName()); + wc.setStateStorageServiceUrl("memory://local"); + return wc; + } + + @Test + public void testE2EPulsarFunctionLocalRun() throws Throwable { + runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null)); + } +} diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index a4cfd49..67318a8 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -55,6 +55,12 @@ <dependency> <groupId>${project.groupId}</groupId> + <artifactId>pulsar-metadata</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>pulsar-io-core</artifactId> <version>${project.version}</version> </dependency> diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index bbc7ced..ce8c9fc 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -104,6 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private LogAppender logAppender; // provide tables for storing states + private final String stateStorageImplClass; private final String stateStorageServiceUrl; private StateStoreProvider stateStoreProvider; private StateManager stateManager; @@ -145,6 +146,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { ClientBuilder clientBuilder, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, + String stateStorageImplClass, String stateStorageServiceUrl, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, @@ -153,6 +155,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.clientBuilder = clientBuilder; this.client = (PulsarClientImpl) pulsarClient; this.pulsarAdmin = pulsarAdmin; + this.stateStorageImplClass = stateStorageImplClass; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.functionClassLoader = functionClassLoader; @@ -323,8 +326,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { if (null == stateStorageServiceUrl) { stateStoreProvider = StateStoreProvider.NULL; } else { - stateStoreProvider = new BKStateStoreProviderImpl(); - Map<String, Object> stateStoreProviderConfig = new HashMap(); + stateStoreProvider = getStateStoreProvider(); + Map<String, Object> stateStoreProviderConfig = new HashMap<>(); stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl); stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails()); @@ -340,6 +343,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } + private StateStoreProvider getStateStoreProvider() throws Exception { + if (stateStorageImplClass == null) { + return new BKStateStoreProviderImpl(); + } else { + return (StateStoreProvider) Class.forName(stateStorageImplClass).getConstructor().newInstance(); + } + } + private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception { if (result.getUserException() != null) { Exception t = result.getUserException(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java index fd4228a..32901bd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java @@ -56,8 +56,6 @@ import org.apache.pulsar.functions.utils.FunctionCommon; @Slf4j public class BKStateStoreProviderImpl implements StateStoreProvider { - public static final String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl"; - private String stateStorageServiceUrl; private Map<String, StorageClient> clients; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java new file mode 100644 index 0000000..1c1df7f --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.state; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.functions.api.StateStoreContext; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; + +public class PulsarMetadataStateStoreImpl implements DefaultStateStore { + + private final MetadataStore store; + private final String prefixPath; + private final MetadataCache<Long> countersCache; + + private final String namespace; + private final String tenant; + private final String name; + private final String fqsn; + + PulsarMetadataStateStoreImpl(MetadataStore store, String prefix, String tenant, String namespace, String name) { + this.store = store; + this.tenant = tenant; + this.namespace = namespace; + this.name = name; + this.fqsn = tenant + '/' + namespace + '/' + name; + + this.prefixPath = prefix + '/' + fqsn + '/'; + this.countersCache = store.getMetadataCache(Long.class); + } + + @Override + public String tenant() { + return tenant; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public String name() { + return name; + } + + @Override + public String fqsn() { + return fqsn; + } + + @Override + public void init(StateStoreContext ctx) { + } + + @Override + public void close() { + } + + @Override + public void put(String key, ByteBuffer value) { + putAsync(key, value).join(); + } + + @Override + public CompletableFuture<Void> putAsync(String key, ByteBuffer value) { + byte[] bytes = new byte[value.remaining()]; + value.get(bytes); + return store.put(getPath(key), bytes, Optional.empty()) + .thenApply(__ -> null); + } + + @Override + public void delete(String key) { + deleteAsync(key).join(); + } + + @Override + public CompletableFuture<Void> deleteAsync(String key) { + return store.delete(getPath(key), Optional.empty()); + } + + @Override + public ByteBuffer get(String key) { + return getAsync(key).join(); + } + + @Override + public CompletableFuture<ByteBuffer> getAsync(String key) { + return store.get(getPath(key)) + .thenApply(optRes -> + optRes.map(x -> ByteBuffer.wrap(x.getValue())) + .orElse(null)); + } + + @Override + public void incrCounter(String key, long amount) { + incrCounterAsync(key, amount); + } + + @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + return countersCache.readModifyUpdateOrCreate(getPath(key), optValue -> + optValue.orElse(0L) + amount + ).thenApply(__ -> null); + } + + @Override + public long getCounter(String key) { + return getCounterAsync(key).join(); + } + + @Override + public CompletableFuture<Long> getCounterAsync(String key) { + return countersCache.get(getPath(key)) + .thenApply(optValue -> optValue.orElse(0L)); + } + + private String getPath(String key) { + return prefixPath + key; + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java new file mode 100644 index 0000000..819bfd9 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.state; + +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; + +public class PulsarMetadataStateStoreProviderImpl implements StateStoreProvider { + + private static final String METADATA_URL = "METADATA_URL"; + private static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE"; + + private static final String METADATA_PREFIX = "METADATA_PREFIX"; + private static final String METADATA_DEFAULT_PREFIX = "/state-store"; + + private MetadataStore store; + private String prefix; + private boolean shouldCloseStore; + + @Override + public void init(Map<String, Object> config, Function.FunctionDetails functionDetails) throws Exception { + + prefix = (String) config.getOrDefault(METADATA_PREFIX, METADATA_DEFAULT_PREFIX); + + if (config.containsKey(METADATA_STORE_INSTANCE)) { + store = (MetadataStore) config.get(METADATA_STORE_INSTANCE); + shouldCloseStore = false; + } else { + String metadataUrl = (String) config.get(METADATA_URL); + store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build()); + shouldCloseStore = true; + } + } + + @Override + public DefaultStateStore getStateStore(String tenant, String namespace, String name) throws Exception { + return new PulsarMetadataStateStoreImpl(store, prefix, tenant, namespace, name); + } + + @SneakyThrows + @Override + public void close() { + if (shouldCloseStore) { + store.close(); + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java index db3c6b3..4c01e9d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java @@ -27,6 +27,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; */ public interface StateStoreProvider extends AutoCloseable { + String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl"; + /** * The state store provider returns `null` state stores. */ diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 59a4f5b..b96e813 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -71,7 +71,7 @@ public class JavaInstanceRunnableTest { ClientBuilder clientBuilder = mock(ClientBuilder.class); when(clientBuilder.build()).thenReturn(null); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - config, clientBuilder, null, null, null, null, null, null); + config, clientBuilder, null, null, null,null, null, null, null); return javaInstanceRunnable; } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java new file mode 100644 index 0000000..a6fb3eb --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.state; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.*; +import io.kubernetes.client.proto.Meta; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import lombok.SneakyThrows; +import org.apache.bookkeeper.api.kv.Table; +import org.apache.bookkeeper.api.kv.options.Options; +import org.apache.bookkeeper.api.kv.result.DeleteResult; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test {@link BKStateStoreImpl}. + */ +public class PulsarMetadataStateStoreImplTest { + + private static final String TENANT = "test-tenant"; + private static final String NS = "test-ns"; + private static final String NAME = "test-name"; + private static final String FQSN = "test-tenant/test-ns/test-name"; + private static final String PREFIX = "/prefix"; + private static final String PREFIX_PATH = PREFIX + '/' + FQSN + '/'; + + private MetadataStore store; + private MetadataCache<Long> countersCache; + private DefaultStateStore stateContext; + + @BeforeMethod + public void setup() throws Exception { + this.store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build()); + this.countersCache = store.getMetadataCache(Long.class); + this.stateContext = new PulsarMetadataStateStoreImpl(store, "/prefix", TENANT, NS, NAME); + } + + @AfterMethod + public void cleanup() throws Exception { + this.store.close(); + } + + @Test + public void testGetter() { + assertEquals(stateContext.tenant(), TENANT); + assertEquals(stateContext.namespace(), NS); + assertEquals(stateContext.name(), NAME); + assertEquals(stateContext.fqsn(), FQSN); + } + + @Test + public void testIncr() throws Exception { + stateContext.incrCounter("test-key", 10L); + assertEquals(countersCache.get(PREFIX_PATH + "test-key").join().get().longValue(), 10); + } + + @Test + public void testPut() throws Exception { + stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))); + assertEquals(store.get(PREFIX_PATH + "test-key").join().get().getValue(), "test-value".getBytes(UTF_8)); + } + + @Test + public void testDelete() throws Exception { + stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))); + assertEquals("test-value".getBytes(UTF_8), store.get(PREFIX_PATH + "test-key").join().get().getValue()); + stateContext.delete("test-key"); + assertFalse(store.get(PREFIX_PATH + "test-key").join().isPresent()); + } + + @Test + public void testGetAmount() throws Exception { + assertEquals(stateContext.getCounter("test-key"), 0); + stateContext.incrCounter("test-key", 10L); + assertEquals(countersCache.get(PREFIX_PATH + "test-key").join().get().longValue(), 10); + assertEquals(stateContext.getCounter("test-key"), 10); + } + + @Test + public void testGetKeyNotPresent() throws Exception { + CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key"); + assertTrue(result != null); + assertEquals(result.get(), null); + } + +} diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 0bedb74..295199c 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -151,6 +151,8 @@ public class LocalRunner implements AutoCloseable { protected SourceConfig sourceConfig; @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class) protected SinkConfig sinkConfig; + @Parameter(names = "--stateStorageImplClass", description = "The implemenatation class state storage service (by default Apache BookKeeper)", hidden = true, required = false) + protected String stateStorageImplClass; @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true) protected String stateStorageServiceUrl; @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true) @@ -201,8 +203,9 @@ public class LocalRunner implements AutoCloseable { } @Builder - public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, String - stateStorageServiceUrl, String brokerServiceUrl, String clientAuthPlugin, String clientAuthParams, + public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, + String stateStorageImplClass, String stateStorageServiceUrl, String brokerServiceUrl, + String clientAuthPlugin, String clientAuthParams, boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled, String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv, String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory, @@ -210,6 +213,7 @@ public class LocalRunner implements AutoCloseable { this.functionConfig = functionConfig; this.sourceConfig = sourceConfig; this.sinkConfig = sinkConfig; + this.stateStorageImplClass = stateStorageImplClass; this.stateStorageServiceUrl = stateStorageServiceUrl; this.brokerServiceUrl = brokerServiceUrl; this.clientAuthPlugin = clientAuthPlugin; @@ -614,6 +618,7 @@ public class LocalRunner implements AutoCloseable { } runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", serviceUrl, + stateStorageImplClass, stateStorageServiceUrl, authConfig, secretsProvider, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index 1881b55..5995b87 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -94,6 +94,9 @@ public class JavaInstanceStarter implements AutoCloseable { @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") public String tlsTrustCertFilePath; + @Parameter(names = "--state_storage_impl_class", description = "State Storage Service Implementation class\n", required= false) + public String stateStorageImplClass; + @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false) public String stateStorageServiceUrl; @@ -196,6 +199,7 @@ public class JavaInstanceStarter implements AutoCloseable { RuntimeUtils.registerDefaultCollectors(collectorRegistry); containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl, + stateStorageImplClass, stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin) .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls)) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index be43bb0..950f48b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import io.prometheus.client.CollectorRegistry; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; @@ -68,6 +67,7 @@ public class ThreadRuntime implements Runtime { private ClientBuilder clientBuilder; private PulsarClient pulsarClient; private PulsarAdmin pulsarAdmin; + private String stateStorageImplClass; private String stateStorageServiceUrl; private SecretsProvider secretsProvider; private FunctionCollectorRegistry collectorRegistry; @@ -81,6 +81,7 @@ public class ThreadRuntime implements Runtime { PulsarClient client, ClientBuilder clientBuilder, PulsarAdmin pulsarAdmin, + String stateStorageImplClass, String stateStorageServiceUrl, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, @@ -97,6 +98,7 @@ public class ThreadRuntime implements Runtime { this.clientBuilder = clientBuilder; this.pulsarClient = client; this.pulsarAdmin = pulsarAdmin; + this.stateStorageImplClass = stateStorageImplClass; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; @@ -186,6 +188,7 @@ public class ThreadRuntime implements Runtime { clientBuilder, pulsarClient, pulsarAdmin, + stateStorageImplClass, stateStorageServiceUrl, secretsProvider, collectorRegistry, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index 864a067..1e8c96a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -63,6 +63,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private ClientBuilder clientBuilder; private PulsarClient pulsarClient; private PulsarAdmin pulsarAdmin; + private String stateStorageImplClass; private String storageServiceUrl; private SecretsProvider defaultSecretsProvider; private FunctionCollectorRegistry collectorRegistry; @@ -76,21 +77,27 @@ public class ThreadRuntimeFactory implements RuntimeFactory { * This constructor is used by other runtimes (e.g. ProcessRuntime and KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of the function. * When used by other runtimes, the arguments such as secretsProvider and rootClassLoader will be provided. */ - public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, + public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, + String stateStorageImplClass, + String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws Exception { initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig, - storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, + stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry, + narExtractionDirectory, rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty()); } - private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl, + private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, + String pulsarServiceUrl, AuthenticationConfig authConfig, String stateStorageImplClass, + String storageServiceUrl, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, - String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) throws PulsarClientException { + String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager) + throws PulsarClientException { if (rootClassLoader == null) { rootClassLoader = Thread.currentThread().getContextClassLoader(); @@ -104,6 +111,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { this.pulsarAdmin = exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null; this.clientBuilder = InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig, calculateClientMemoryLimit(memoryLimit)); this.pulsarClient = this.clientBuilder.build(); + this.stateStorageImplClass = stateStorageImplClass; this.storageServiceUrl = storageServiceUrl; this.collectorRegistry = collectorRegistry; this.narExtractionDirectory = narExtractionDirectory; @@ -153,6 +161,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { initialize(factoryConfig.getThreadGroupName(), Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()), workerConfig.getPulsarServiceUrl(), authenticationConfig, + workerConfig.getStateStorageProviderImplementation(), workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null, null, workerConfig.getNarExtractionDirectory(), null, workerConfig.isExposeAdminClientEnabled(), workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager)); @@ -179,6 +188,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { pulsarClient, clientBuilder, pulsarAdmin, + stateStorageImplClass, storageServiceUrl, secretsProvider, collectorRegistry, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0cf6a22..eb40ae3 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -50,6 +50,7 @@ import lombok.Data; import lombok.experimental.Accessors; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider; +import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig; import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig; @@ -234,6 +235,13 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The service URL of state storage" ) private String stateStorageServiceUrl; + + @FieldContext( + category = CATEGORY_STATE, + doc = "The implementation class for the state store" + ) + private String stateStorageProviderImplementation = BKStateStoreProviderImpl.class.getName(); + @FieldContext( category = CATEGORY_FUNC_RUNTIME_MNG, doc = "The Pulsar topic used for storing function assignment informations" diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 4bea15e..e2bb784 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -167,7 +167,7 @@ public class FunctionsImplTest { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null, null, null); CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); @@ -222,7 +222,7 @@ public class FunctionsImplTest { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null, null, null); CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); completableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class);
