This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a2f8aa8 Move functions to use V2 style namespaces for internal data keeping (#1742) a2f8aa8 is described below commit a2f8aa86943de11da944250ceafd470f26421699 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon May 7 16:07:31 2018 -0700 Move functions to use V2 style namespaces for internal data keeping (#1742) * Move functions to use V2 style namespaces for internal data keeping * Incorporate feedback --- conf/functions_worker.yml | 3 ++- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 9 +++++---- .../src/main/java/org/apache/pulsar/functions/worker/Worker.java | 3 +++ .../java/org/apache/pulsar/functions/worker/WorkerConfig.java | 1 + 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index ff9d231..663c4f7 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -23,7 +23,8 @@ workerPort: 6750 functionMetadataTopicName: metadata functionMetadataSnapshotsTopicPath: snapshots clusterCoordinationTopicName: coordinate -pulsarFunctionsNamespace: sample/standalone/functions +pulsarFunctionsNamespace: public/functions +pulsarFunctionsCluster: standalone pulsarServiceUrl: pulsar://localhost:6650 pulsarWebServiceUrl: http://localhost:8080 numFunctionPackageReplicas: 1 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 38d8e84..7b975f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -30,9 +30,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -81,6 +79,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -852,7 +851,7 @@ public class PulsarService implements AutoCloseable { .getWorkerConfig().getPulsarFunctionsNamespace(); String[] a = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/"); String property = a[0]; - String cluster = a[1]; + String cluster = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster(); /* multiple brokers may be trying to create the property, cluster, and namespace @@ -904,6 +903,8 @@ public class PulsarService implements AutoCloseable { // create namespace for function worker service try { Policies policies = new Policies(); + policies.retention_policies = new RetentionPolicies(-1, -1); + policies.replication_clusters = Collections.singleton(functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster()); int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles(); policies.bundles = getBundles(defaultNumberOfBundles); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 469cdf0..4f700ab 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -31,6 +31,7 @@ import org.apache.pulsar.functions.worker.rest.WorkerServer; import javax.ws.rs.core.Response; import java.io.IOException; import java.net.URI; +import java.util.HashSet; @Slf4j public class Worker extends AbstractService { @@ -104,6 +105,8 @@ public class Worker extends AbstractService { try { Policies policies = new Policies(); policies.retention_policies = new RetentionPolicies(-1, -1); + policies.replication_clusters = new HashSet<>(); + policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster()); admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), policies); } catch (PulsarAdminException e1) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 06f4e53..161d210 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -51,6 +51,7 @@ public class WorkerConfig implements Serializable { private String clusterCoordinationTopicName; private String functionMetadataSnapshotsTopicPath; private String pulsarFunctionsNamespace; + private String pulsarFunctionsCluster; private int numFunctionPackageReplicas; private String downloadDirectory; private long snapshotFreqMs; -- To stop receiving notification emails like this one, please contact mme...@apache.org.