[GitHub] skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar adaptor for Apache Kafka
skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar adaptor for Apache Kafka URL: https://github.com/apache/pulsar/issues/2792#issuecomment-433592430 @ivankelly https://kafka.apache.org/documentation.html#newconsumerconfigs . What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): *earliest*: automatically reset the offset to the earliest offset *latest*: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer's group anything else: throw exception to the consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?
sijie commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker? URL: https://github.com/apache/pulsar/issues/2858#issuecomment-433592171 @benjaminhuo > Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker the helm chart was contributed from a production deployment in google cloud. so those values are set to pretty high. I think we can improve it by introducing 3 different groups of settings, e.g. small, medium and large. so people can choose the proper settings to deploy to their environment. I am wondering if you are interested in contributing a change to this helm chart :) > The manifest(bookie.yaml) deploy bookeeper as daemonset, why not statefulset with persistent volume claim? I think helm chart deploys bookkeeper as statefulset. https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/bookkeeper-statefulset.yaml You mean the kubernetes manifest here: https://github.com/apache/pulsar/blob/master/deployment/kubernetes/generic/bookie.yaml#L33 The reason why it is using daemonset for generic deployment is following: as a generic deployment, people might not enable any persistent volumes enabled in their k8s environment. so for they to tryout pulsar quickly, daemonset is an easy solution to start. In cloud environment, for example, aws and google cloud, we are recommending people deploying using statefulset. https://github.com/apache/pulsar/blob/master/deployment/kubernetes/aws/bookkeeper.yaml#L67 but since k8s is now supporting local volume claim, I think we can change generic deployment to use statefulset with local volume claim. wondering if you are interested in contributing to this as well? Hope this answer your question? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2814: Fix memory issue in cpp ZTSClient
sijie closed pull request #2814: Fix memory issue in cpp ZTSClient URL: https://github.com/apache/pulsar/pull/2814 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc index 3097c30c65..94671241e2 100644 --- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc +++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc @@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char *input, int length) { } char *ZTSClient::base64Decode(const char *input) { -BIO *bio, *b64; +if (input == NULL) { +return NULL; +} + size_t length = strlen(input); +if (length == 0) { +return NULL; +} + +BIO *bio, *b64; char *result = (char *)malloc(length); bio = BIO_new_mem_buf((void *)input, -1); @@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) { bio = BIO_push(b64, bio); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); -BIO_read(bio, result, length); +int decodeStrLen = BIO_read(bio, result, length); BIO_free_all(bio); +if (decodeStrLen > 0) { +result[decodeStrLen] = '\0'; +return result; +} +free(result); -return result; +return NULL; } const std::string ZTSClient::getPrincipalToken() const { // construct unsigned principal token std::string unsignedTokenString = "v=S1"; -char host[BUFSIZ]; +char host[BUFSIZ] = {}; long long t = (long long)time(NULL); gethostname(host, sizeof(host)); @@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const { // signing const char *unsignedToken = unsignedTokenString.c_str(); -unsigned char signature[BUFSIZ]; -unsigned char hash[SHA256_DIGEST_LENGTH]; +unsigned char signature[BUFSIZ] = {}; +unsigned char hash[SHA256_DIGEST_LENGTH] = {}; unsigned int siglen; FILE *fp; RSA *privateKey; @@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const { } char *decodeStr = base64Decode(privateKeyUri_.data.c_str()); +if (decodeStr == NULL) { +LOG_ERROR("Failed to decode privateKey"); +return ""; +} + BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); if (bio == NULL) { LOG_ERROR("Failed to create key BIO"); +free(decodeStr); return ""; } privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL); BIO_free(bio); +free(decodeStr); if (privateKey == NULL) { LOG_ERROR("Failed to load privateKey"); return ""; @@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const { std::string principalToken = unsignedTokenString + ";s=" + ybase64Encode(signature, siglen); LOG_DEBUG("Created signed principal token: " << principalToken); +RSA_free(privateKey); + return principalToken; } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Fix memory issue in cpp ZTSClient (#2814)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c518247 Fix memory issue in cpp ZTSClient (#2814) c518247 is described below commit c518247d873bfe07364359e5f63f5fd7f689c0e9 Author: hrsakai AuthorDate: Sat Oct 27 14:09:27 2018 +0900 Fix memory issue in cpp ZTSClient (#2814) ### Modifications * Use `calloc` instead of `malloc` in order to add Termination character(`\0`). * Free memory allocated with `calloc` and `PEM_read_bio_RSAPrivateKey`. --- pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 34 +- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc index 3097c30..9467124 100644 --- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc +++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc @@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char *input, int length) { } char *ZTSClient::base64Decode(const char *input) { -BIO *bio, *b64; +if (input == NULL) { +return NULL; +} + size_t length = strlen(input); +if (length == 0) { +return NULL; +} + +BIO *bio, *b64; char *result = (char *)malloc(length); bio = BIO_new_mem_buf((void *)input, -1); @@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) { bio = BIO_push(b64, bio); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); -BIO_read(bio, result, length); +int decodeStrLen = BIO_read(bio, result, length); BIO_free_all(bio); +if (decodeStrLen > 0) { +result[decodeStrLen] = '\0'; +return result; +} +free(result); -return result; +return NULL; } const std::string ZTSClient::getPrincipalToken() const { // construct unsigned principal token std::string unsignedTokenString = "v=S1"; -char host[BUFSIZ]; +char host[BUFSIZ] = {}; long long t = (long long)time(NULL); gethostname(host, sizeof(host)); @@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const { // signing const char *unsignedToken = unsignedTokenString.c_str(); -unsigned char signature[BUFSIZ]; -unsigned char hash[SHA256_DIGEST_LENGTH]; +unsigned char signature[BUFSIZ] = {}; +unsigned char hash[SHA256_DIGEST_LENGTH] = {}; unsigned int siglen; FILE *fp; RSA *privateKey; @@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const { } char *decodeStr = base64Decode(privateKeyUri_.data.c_str()); +if (decodeStr == NULL) { +LOG_ERROR("Failed to decode privateKey"); +return ""; +} + BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); if (bio == NULL) { LOG_ERROR("Failed to create key BIO"); +free(decodeStr); return ""; } privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL); BIO_free(bio); +free(decodeStr); if (privateKey == NULL) { LOG_ERROR("Failed to load privateKey"); return ""; @@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const { std::string principalToken = unsignedTokenString + ";s=" + ybase64Encode(signature, siglen); LOG_DEBUG("Created signed principal token: " << principalToken); +RSA_free(privateKey); + return principalToken; }
[pulsar] branch master updated: Added ability for the kubernetes to poll a configmap to look out for changes (#2856)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2d208c5 Added ability for the kubernetes to poll a configmap to look out for changes (#2856) 2d208c5 is described below commit 2d208c565e3bd25a1cc12d904f72b91f96ec6cab Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 22:07:54 2018 -0700 Added ability for the kubernetes to poll a configmap to look out for changes (#2856) ### Motivation While running functions worker in kubernetes, it might be hard to update any kind of config of the worker without killing it. This pr allows certain properties of the functions to be stored in a configmap. The KubernetesRuntime Factory will keep an eye for any changes and apply the changes on the fly dramatically simplifying any config updates --- .../runtime/KubernetesRuntimeFactory.java | 126 +++-- .../functions/runtime/KubernetesRuntimeTest.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 5 + 4 files changed, 101 insertions(+), 38 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 1a180ae..c6d5d02 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -24,13 +24,20 @@ import io.kubernetes.client.ApiClient; import io.kubernetes.client.Configuration; import io.kubernetes.client.apis.AppsV1Api; import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1ConfigMap; import io.kubernetes.client.util.Config; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import java.lang.reflect.Field; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -40,24 +47,33 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; @Slf4j public class KubernetesRuntimeFactory implements RuntimeFactory { -private final String k8Uri; -private final String jobNamespace; -private final String pulsarDockerImageName; -private final String pulsarRootDir; +@Getter +@Setter +@NoArgsConstructor +class KubernetesInfo { +private String k8Uri; +private String jobNamespace; +private String pulsarDockerImageName; +private String pulsarRootDir; +private String pulsarAdminUrl; +private String pulsarServiceUrl; +private String pythonDependencyRepository; +private String pythonExtraDependencyRepository; +private String changeConfigMap; +private String changeConfigMapNamespace; +} +private final KubernetesInfo kubernetesInfo; private final Boolean submittingInsidePod; private final Boolean installUserCodeDependencies; -private final String pythonDependencyRepository; -private final String pythonExtraDependencyRepository; private final Map customLabels; -private final String pulsarAdminUri; -private final String pulsarServiceUri; +private final Integer expectedMetricsCollectionInterval; private final String stateStorageServiceUri; private final AuthenticationConfig authConfig; private final String javaInstanceJarFile; private final String pythonInstanceFile; private final String prometheusMetricsServerJarFile; private final String logDirectory = "logs/functions"; -private final Integer expectedMetricsInterval; +private Timer changeConfigMapTimer; private AppsV1Api appsClient; private CoreV1Api coreClient; @@ -75,36 +91,41 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String pulsarAdminUri, String stateStorageServiceUri, AuthenticationConfig authConfig, -Integer expectedMetricsInterval) { -this.k8Uri = k8Uri; +Integer expectedMetricsCollectionInterval, +String changeConfigMap, +String changeConfigMapNamespace) { +this.kubernetesInfo = new KubernetesInfo(); +this.kubernetesInfo.setK8Uri(k8Uri); if
[GitHub] sijie closed pull request #2857: Reduce memory reserved for prometheus
sijie closed pull request #2857: Reduce memory reserved for prometheus URL: https://github.com/apache/pulsar/pull/2857 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 80eb8400af..7f3f72d44b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -66,7 +66,7 @@ private static final Integer GRPC_PORT = 9093; private static final Integer PROMETHEUS_PORT = 9094; private static final Double prometheusMetricsServerCpu = 0.1; -private static final Long prometheusMetricsServerRam = 25000l; +private static final Long prometheusMetricsServerRam = 12500l; public static final Pattern VALID_POD_NAME_REGEX = Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", Pattern.CASE_INSENSITIVE); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] benjaminhuo opened a new issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?
benjaminhuo opened a new issue #2858: What official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker? URL: https://github.com/apache/pulsar/issues/2858 Expected behavior The memory should be set to a reasonable size Actual behavior Tell us what happens instead Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker ![snip20181027_103](https://user-images.githubusercontent.com/18525465/47598862-1cd85080-d9d5-11e8-8201-451d6c377486.png) ![snip20181027_104](https://user-images.githubusercontent.com/18525465/47598863-1cd85080-d9d5-11e8-8728-46d4cb3b6d42.png) ![snip20181027_107](https://user-images.githubusercontent.com/18525465/47598864-1cd85080-d9d5-11e8-89c3-8d8b43343c2b.png) ![snip20181027_108](https://user-images.githubusercontent.com/18525465/47598865-1d70e700-d9d5-11e8-8c26-0f718f609173.png) Steps to reproduce System configuration **Pulsar version**: x.y This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus
srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus URL: https://github.com/apache/pulsar/pull/2857 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes
srkukarni opened a new pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes URL: https://github.com/apache/pulsar/pull/2856 ### Motivation While running functions worker in kubernetes, it might be hard to update any kind of config of the worker without killing it. This pr allows certain properties of the functions to be stored in a configmap. The KubernetesRuntime Factory will keep an eye for any changes and apply the changes on the fly dramatically simplifying any config updates ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2855: Secretprovider interfaces and some default implementations
srkukarni opened a new pull request #2855: Secretprovider interfaces and some default implementations URL: https://github.com/apache/pulsar/pull/2855 ### Motivation This pr defines the SecretsProviderConfigurator that configures the function instances/containers and plugs in the right SecretsProvider for the instances to use. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2855: Secretprovider interfaces and some default implementations
srkukarni commented on issue #2855: Secretprovider interfaces and some default implementations URL: https://github.com/apache/pulsar/pull/2855#issuecomment-433562824 @cckellogg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433541994 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2852: Add connection timeout client configuration option
merlimat commented on issue #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433535510 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client
merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client URL: https://github.com/apache/pulsar/pull/2854 ### Motivation The ZLib method `uncompress()` is not able to process data that was compressed from Pulsar Java producer with ZLib. This is due to the the initialization settings that are used inside the `uncompress()` functions and that are different from how ZLib is used within the JDK. This causes C++ (and derivatives) consumers to drop messages that were compressed by Java producer. A Java consumer is able to correctly decompress both. ### Modifications * Fixed decompression in C++ to be able to inflate both zstream terminations * Added option in C++ perf producer to compress messages This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433526803 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515376 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515089 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04622..7f1b5aa45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public void testProxyStats() throws Exception { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; -System.out.println(consumerUri+", "+producerUri); +System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public void testProxyStats() throws Exception { } } +@Test(timeOut = 1) +public void consumeMessagesInPartitionedTopicTest() throws Exception { +final String namespace = "my-property/my-ns"; +final String topic = namespace + "/" + "my-topic7"; +admin.topics().createPartitionedTopic("persistent://" + topic, 3); + +final String subscription = "my-sub"; +final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; +final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + +URI consumeUri = URI.create(consumerUri); +URI produceUri = URI.create(producerUri); + +WebSocketClient consumeClient = new WebSocketClient(); +WebSocketClient produceClient = new WebSocketClient(); + +SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); +SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + +try { +produceClient.start(); +ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); +Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); +producerFuture.get(); +produceSocket.sendMessage(100); +} finally { +stopWebSocketClient(produceClient); +} + +Thread.sleep(500); + +try { +consumeClient.start(); +ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); +Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); +consumerFuture.get(); +} finally { +stopWebSocketClient(consumeClient); +} +} + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e086ed..4d38aa949d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return MessageIdImpl.fromByteArray(data); } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +return MessageIdImpl.fromByteArrayWithTopic(data, topicName); +} + public static final MessageId earliest = new MessageIdImpl(-1, -1, -1); public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 3686b124a8..5a53436e7b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.MessageId;
[GitHub] merlimat commented on issue #1070: Provide a nodejs client
merlimat commented on issue #1070: Provide a nodejs client URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433491319 @mujimu There are indeed plans to start soon working on this. Thanks for the link. If you have JS experience it would be also good to have feedback on API ergonomics once we start working on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Fix Websocket Consume Messages in Partitioned Topics (#2829)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d79499d Fix Websocket Consume Messages in Partitioned Topics (#2829) d79499d is described below commit d79499dbe24fbd8b065489875ba86a47761ec33f Author: Yuto Furuta AuthorDate: Sat Oct 27 02:53:09 2018 +0900 Fix Websocket Consume Messages in Partitioned Topics (#2829) * fix consume messages in partitioned topics on websocket * add consumeMessagesInPartitionedTopicTest * add fromByteArrayWithTopic * remove public --- .../websocket/proxy/ProxyPublishConsumeTest.java | 43 +- .../org/apache/pulsar/client/api/MessageId.java| 5 +++ .../apache/pulsar/client/impl/MessageIdImpl.java | 31 .../apache/pulsar/websocket/ConsumerHandler.java | 3 +- 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04..7f1b5aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; -System.out.println(consumerUri+", "+producerUri); +System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } +@Test(timeOut = 1) +public void consumeMessagesInPartitionedTopicTest() throws Exception { +final String namespace = "my-property/my-ns"; +final String topic = namespace + "/" + "my-topic7"; +admin.topics().createPartitionedTopic("persistent://" + topic, 3); + +final String subscription = "my-sub"; +final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; +final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + +URI consumeUri = URI.create(consumerUri); +URI produceUri = URI.create(producerUri); + +WebSocketClient consumeClient = new WebSocketClient(); +WebSocketClient produceClient = new WebSocketClient(); + +SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); +SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + +try { +produceClient.start(); +ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); +Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); +producerFuture.get(); +produceSocket.sendMessage(100); +} finally { +stopWebSocketClient(produceClient); +} + +Thread.sleep(500); + +try { +consumeClient.start(); +ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); +Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); +consumerFuture.get(); +} finally { +stopWebSocketClient(consumeClient); +} +} + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e08..4d38aa9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public interface MessageId extends Comparable, Serializable { return MessageIdImpl.fromByteArray(data); } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +return
[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface
jerrypeng commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228607992 ## File path: pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.secretsproviderconfigurator; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.models.V1Container; +import io.kubernetes.client.models.V1EnvVar; +import io.kubernetes.client.models.V1EnvVarSource; +import io.kubernetes.client.models.V1SecretKeySelector; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; + +import java.lang.reflect.Type; +import java.util.Map; + +/** + * Context provides contextual information to the executing function. + * Features like which message id we are handling, whats the topic name of the + * message, what are our operating constraints, etc can be accessed by the + * executing function + */ +public class KubernetesSecretsProviderConfigurator implements SecretsProviderConfigurator { +@Override +public void init(Map config) { + +} + +@Override +public String getSecretsProviderClassName(Function.FunctionDetails functionDetails) { +switch (functionDetails.getRuntime()) { +case JAVA: +return EnvironmentBasedSecretsProvider.class.getName(); +case PYTHON: +return "secretsprovider.EnvironmentBasedSecretsProvider"; +default: +throw new RuntimeException("Unknown function runtime " + functionDetails.getRuntime()); +} +} + +@Override +public Map getSecretsProviderConfig(Function.FunctionDetails functionDetails) { +return null; +} + +@Override +public void configureKubernetesRuntimeSecretsProvider(V1Container container, Function.FunctionDetails functionDetails) { +if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { +Type type = new TypeToken>() { +}.getType(); +Map secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); Review comment: Perhaps its worthwhile to explicitly define the structure of the "Object" for kubernetes so it is clear what configs should be there. You can just probably declare a class that represents the Object This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface
jerrypeng commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228606081 ## File path: pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java ## @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.secretsprovider; + +import java.util.Map; + +/** + * Context provides contextual information to the executing function. Review comment: I don't think this is the right comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: adding Python function instance unit test (#2850)
This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1a1cf22 adding Python function instance unit test (#2850) 1a1cf22 is described below commit 1a1cf226a718fbaece393e8e1ed9584b59a8752b Author: Boyang Jerry Peng AuthorDate: Fri Oct 26 10:14:01 2018 -0700 adding Python function instance unit test (#2850) * adding python function instance unittests * adding license headers --- bin/pulsar | 2 +- bin/pulsar-admin | 2 +- distribution/server/src/assemble/bin.xml | 2 +- pulsar-client-cpp/run-unit-tests.sh| 4 ++ pulsar-functions/instance/pom.xml | 31 .../src/scripts/run_python_instance_tests.sh | 30 +++ .../src/test/python/test_python_instance.py| 58 ++ pulsar-functions/runtime/pom.xml | 28 --- 8 files changed, 126 insertions(+), 31 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index 9cacfe7..02a81a0 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -94,7 +94,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then echo "\nCouldn't find pulsar-functions python instance."; echo "Make sure you've run 'mvn package'\n"; diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 1a1339d..39d1239 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -130,7 +130,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} else diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index c9ad893..d17d4ff 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -44,7 +44,7 @@ ${basedir}/licenses - ${basedir}/../../pulsar-functions/runtime/target/python-instance + ${basedir}/../../pulsar-functions/instance/target/python-instance instances/python-instance diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index ed4b6b4..cffe002 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then python pulsar_test.py RES=$? +echo " Running Python Function Instance unit tests" +bash /pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +RES=$? + popd popd diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index e206b11..1c8f3ab 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -103,4 +103,35 @@ + + + + +org.apache.maven.plugins +maven-antrun-plugin + + +compile + + run + + + +building python instance + + + + + + + + + + + + + + diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh new file mode 100644 index 000..9b33c24 --- /dev/null +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed
[pulsar] branch master updated: Cleanup Logging for python functions (#2847)
This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fa212bc Cleanup Logging for python functions (#2847) fa212bc is described below commit fa212bc8bb0cbd81729b498b6041cde794be9684 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 10:13:39 2018 -0700 Cleanup Logging for python functions (#2847) * Cleanup Logging for python functions * More statements to debug * More debug statements * More debug --- .../instance/src/main/python/python_instance.py| 8 ++--- .../instance/src/main/python/server.py | 8 ++--- pulsar-functions/instance/src/main/python/util.py | 37 ++ 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafae..5f1645d 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ class PythonInstance(object): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): -Log.info("Started Thread for executing the function") +Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ class PythonInstance(object): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737..58d43d2 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): -Log.info("Came in GetFunctionStatus") +Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): -Log.info("Came in GetAndResetMetrics") +Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): -Log.info("Came in ResetMetrics") +Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): -Log.info("Came in GetMetrics") +Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457..56c1ce1 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions' def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: +
[GitHub] jerrypeng closed pull request #2850: adding Python function instance unit test
jerrypeng closed pull request #2850: adding Python function instance unit test URL: https://github.com/apache/pulsar/pull/2850 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/pulsar b/bin/pulsar index 9cacfe7782..02a81a04ef 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -94,7 +94,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then echo "\nCouldn't find pulsar-functions python instance."; echo "Make sure you've run 'mvn package'\n"; diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 1a1339d8ab..39d12393b2 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -130,7 +130,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} else diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index c9ad893d56..d17d4ff39f 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -44,7 +44,7 @@ ${basedir}/licenses - ${basedir}/../../pulsar-functions/runtime/target/python-instance + ${basedir}/../../pulsar-functions/instance/target/python-instance instances/python-instance diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index ed4b6b4a1c..cffe0023b7 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then python pulsar_test.py RES=$? +echo " Running Python Function Instance unit tests" +bash /pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +RES=$? + popd popd diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index e206b111cb..1c8f3ab78e 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -103,4 +103,35 @@ + + + + +org.apache.maven.plugins +maven-antrun-plugin + + +compile + + run + + + +building python instance + + + + + + + + + + + + + + diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh new file mode 100644 index 00..9b33c24481 --- /dev/null +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Make sure dependencies are installed +pip install mock --user +pip install protobuf --user + +CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +PULSAR_HOME=$CUR_DIR/../../../../ + +# run instance tests +PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests \ No newline at end of file diff --git
[GitHub] jerrypeng closed pull request #2847: Cleanup Logging for python functions
jerrypeng closed pull request #2847: Cleanup Logging for python functions URL: https://github.com/apache/pulsar/pull/2847 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafaef2f..5f1645dd74 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ def run(self): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): -Log.info("Started Thread for executing the function") +Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ def setup_output_serde(self): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737031..58d43d204d 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ def __init__(self, pyinstance): self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): -Log.info("Came in GetFunctionStatus") +Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): -Log.info("Came in GetAndResetMetrics") +Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): -Log.info("Came in ResetMetrics") +Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): -Log.info("Came in GetMetrics") +Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457bbc..56c1ce1c9f 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: +return import_class_from_path(from_path, full_class_name) + except Exception as e: our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe( api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT) -kclass = import_class_from_path(api_dir, full_class_name) - return kclass +try: + return import_class_from_path(api_dir, full_class_name) +except Exception as e: + Log.info("Failed to import class %s from path %s" % (full_class_name, from_path)) + Log.info(e, exc_info=True) + return None def import_class_from_path(from_path,
[GitHub] srkukarni opened a new pull request #2853: Secrets Frontend
srkukarni opened a new pull request #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853 ### Motivation This pr defines the FunctionConfig part of secrets implementation. It defines how users can define secrets in their function/source/sink configs and how they get translated to FunctionDetails. Follow up prs to actually implement secrets will follow this one. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433473480 @cckellogg Please take a look. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2826: Secrets Interface
srkukarni commented on issue #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433470126 I will break the change into smaller changes and it might be best to comment on each individual pr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mujimu commented on issue #1070: Provide a nodejs client
mujimu commented on issue #1070: Provide a nodejs client URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433457173 more recent example of wrapping an underlying C++ lib as a node library: https://github.com/Blizzard/node-rdkafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface
cckellogg commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228576463 ## File path: pulsar-functions/instance/src/main/python/secretsprovider.py ## @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# -*- encoding: utf-8 -*- + +"""secretsprovider.py: Interfaces and definitions for Secret Providers +""" +from abc import abstractmethod +import os + +class SecretsProvider: Review comment: I think this ok now i think about it more. More implementations can be added later This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface
cckellogg commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228569533 ## File path: pulsar-functions/instance/src/main/python/secretsprovider.py ## @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# -*- encoding: utf-8 -*- + +"""secretsprovider.py: Interfaces and definitions for Secret Providers +""" +from abc import abstractmethod +import os + +class SecretsProvider: Review comment: I think the implementation for these should be provider specific. Like KuberberneteSecretProvider and that provider know how to read the secret in the container. Same for java. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2852: Add connection timeout client configuration option
ivankelly commented on issue #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433399375 @cckellogg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #2852: Add connection timeout client configuration option
ivankelly opened a new pull request #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852 Allows the client to specify how long to wait for brokers to respond. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2826: Secrets Interface
srkukarni commented on issue #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433365590 @merlimat @cckellogg I've made the changes as requested. Please take a look again This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Use the right field to get the filename (#2851)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 138c863 Use the right field to get the filename (#2851) 138c863 is described below commit 138c863af0a43bd3423a7f3fb3846d2cb8dfe472 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 02:57:03 2018 -0700 Use the right field to get the filename (#2851) --- .../org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 087238c..89e3b48 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -243,8 +243,8 @@ public class FunctionsImpl { Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace()); } else { packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, -fileDetail.getName())); - packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName()); +fileDetail.getFileName())); + packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace()); }
[GitHub] srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well
srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well URL: https://github.com/apache/pulsar/pull/2848 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py b/pulsar-client-cpp/python/pulsar/functions/serde.py index ee6366cf63..968c1c9bc3 100644 --- a/pulsar-client-cpp/python/pulsar/functions/serde.py +++ b/pulsar-client-cpp/python/pulsar/functions/serde.py @@ -75,7 +75,9 @@ def __init__(self): def serialize(self, input): if type(input) in self._types: return str(input).encode('utf-8') -raise TypeError +if type(input) == bytes: + return input +raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) def deserialize(self, input_bytes): for typ in self._types: @@ -83,4 +85,4 @@ def deserialize(self, input_bytes): return typ(input_bytes.decode('utf-8')) except: pass -raise TypeError +return input_bytes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829#discussion_r228456096 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java ## @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return messageId; } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +checkNotNull(data); +ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length)); +PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder(); + +PulsarApi.MessageIdData idData; +try { +idData = builder.mergeFrom(inputStream, null).build(); +} catch (UninitializedMessageException e) { +throw new IOException(e); +} + +MessageId messageId; +if (idData.hasBatchIndex()) { +messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), +idData.getBatchIndex()); +} else { +messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); +} +if (idData.getPartition() > -1 && topicName != null) { Review comment: Oh, we need to pass it to the constructor... sounds odd though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829#discussion_r228454245 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -31,7 +31,7 @@ private final String topicName; private final MessageId messageId; -TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { +public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { Review comment: Is this public still needed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on issue #2814: Fix memory issue in cpp ZTSClient
maskit commented on issue #2814: Fix memory issue in cpp ZTSClient URL: https://github.com/apache/pulsar/pull/2814#issuecomment-47349 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2851: Use the right field to get the filename
srkukarni commented on issue #2851: Use the right field to get the filename URL: https://github.com/apache/pulsar/pull/2851#issuecomment-41679 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well
srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well URL: https://github.com/apache/pulsar/pull/2848#issuecomment-41483 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Fix: schema init for topic-name with special char (#2836)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 000cc22 Fix: schema init for topic-name with special char (#2836) 000cc22 is described below commit 000cc22277563c208237ea2bb6ae1ca53435d88f Author: Rajan Dhabalia AuthorDate: Thu Oct 25 23:35:06 2018 -0700 Fix: schema init for topic-name with special char (#2836) * Fix: schema init for topic-name with special char create namespace before starting test fix test * fix child test --- .../broker/service/BrokerBkEnsemblesTests.java | 32 -- .../pulsar/broker/service/RackAwareTest.java | 5 .../org/apache/pulsar/common/naming/TopicName.java | 2 +- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index cc82ff4..b7303c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -70,8 +70,7 @@ public class BrokerBkEnsemblesTests { LocalBookkeeperEnsemble bkEnsemble; -private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); -protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); +protected int BROKER_WEBSERVICE_PORT; private final int numberOfBookies; @@ -86,6 +85,8 @@ public class BrokerBkEnsemblesTests { @BeforeMethod protected void setup() throws Exception { try { +int ZOOKEEPER_PORT = PortManager.nextFreePort(); +BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); // start local bookie and zookeeper bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); @@ -340,5 +341,32 @@ public class BrokerBkEnsemblesTests { client.close(); } +@Test(timeOut=2) +public void testTopicWithWildCardChar() throws Exception { +PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) +.build(); + +final String ns1 = "prop/usc/topicWithSpecialChar"; +try { +admin.namespaces().createNamespace(ns1); +} catch (Exception e) { + +} + +final String topic1 = "persistent://"+ns1+"/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524"; +final String subName1 = "c1"; +final byte[] content = "test".getBytes(); + +Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); +org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + +producer.send(content); +Message msg = consumer.receive(); +Assert.assertEquals(msg.getData(), content); +consumer.close(); +producer.close(); +client.close(); +} + private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java index ada7b9f..daa6fe5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java @@ -130,5 +130,10 @@ public class RackAwareTest extends BrokerBkEnsemblesTests { // Ignore test } +@Test(enabled = false) +public void testTopicWithWildCardChar() throws Exception { +// Ignore test +} + private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index bde252c..eef05f3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -337,7 +337,7 @@ public class TopicName implements ServiceUnitId { public String getSchemaName() { return getTenant() + "/" + getNamespacePortion() -+ "/" + getLocalName(); ++ "/" + getEncodedLocalName(); } @Override
[GitHub] merlimat closed pull request #2836: Fix: schema init for topic-name with special char
merlimat closed pull request #2836: Fix: schema init for topic-name with special char URL: https://github.com/apache/pulsar/pull/2836 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index cc82ff4c55..b7303c65aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -70,8 +70,7 @@ LocalBookkeeperEnsemble bkEnsemble; -private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); -protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); +protected int BROKER_WEBSERVICE_PORT; private final int numberOfBookies; @@ -86,6 +85,8 @@ public BrokerBkEnsemblesTests(int numberOfBookies) { @BeforeMethod protected void setup() throws Exception { try { +int ZOOKEEPER_PORT = PortManager.nextFreePort(); +BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); // start local bookie and zookeeper bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort()); bkEnsemble.start(); @@ -340,5 +341,32 @@ public void testSkipCorruptDataLedger() throws Exception { client.close(); } +@Test(timeOut=2) +public void testTopicWithWildCardChar() throws Exception { +PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) +.build(); + +final String ns1 = "prop/usc/topicWithSpecialChar"; +try { +admin.namespaces().createNamespace(ns1); +} catch (Exception e) { + +} + +final String topic1 = "persistent://"+ns1+"/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524"; +final String subName1 = "c1"; +final byte[] content = "test".getBytes(); + +Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); +org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + +producer.send(content); +Message msg = consumer.receive(); +Assert.assertEquals(msg.getData(), content); +consumer.close(); +producer.close(); +client.close(); +} + private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java index ada7b9f171..daa6fe5ee9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java @@ -130,5 +130,10 @@ public void testSkipCorruptDataLedger() throws Exception { // Ignore test } +@Test(enabled = false) +public void testTopicWithWildCardChar() throws Exception { +// Ignore test +} + private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index bde252c107..eef05f3b18 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -337,7 +337,7 @@ public boolean isGlobal() { public String getSchemaName() { return getTenant() + "/" + getNamespacePortion() -+ "/" + getLocalName(); ++ "/" + getEncodedLocalName(); } @Override This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Make proxy advertise protocol version of client to broker (#2845)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5bdcdc5 Make proxy advertise protocol version of client to broker (#2845) 5bdcdc5 is described below commit 5bdcdc584834714b7c660cbad84d748ec7b98fee Author: massakam AuthorDate: Fri Oct 26 15:34:52 2018 +0900 Make proxy advertise protocol version of client to broker (#2845) * Make proxy advertise protocol version of client to broker * Revert incorrect change --- .../org/apache/pulsar/client/impl/ClientCnx.java | 10 +++- .../org/apache/pulsar/common/api/Commands.java | 1 - .../pulsar/proxy/server/DirectProxyHandler.java| 13 +++-- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 40 --- .../pulsar/proxy/server/ProxyConnection.java | 28 --- .../org/apache/pulsar/proxy/server/ProxyTest.java | 58 ++ 6 files changed, 117 insertions(+), 33 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 9306a82..ccc20b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler { private volatile int numberOfRejectRequests = 0; private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; +private final int protocolVersion; private final long operationTimeoutMs; protected String proxyToTargetBrokerAddress = null; @@ -123,6 +124,10 @@ public class ClientCnx extends PulsarHandler { } public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { +this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion()); +} + +public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) { super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest()); this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true); @@ -135,6 +140,7 @@ public class ClientCnx extends PulsarHandler { this.state = State.None; this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable(); this.hostnameVerifier = new DefaultHostnameVerifier(); +this.protocolVersion = protocolVersion; } @Override @@ -167,8 +173,8 @@ public class ClientCnx extends PulsarHandler { if (authentication.getAuthData().hasDataFromCommand()) { authData = authentication.getAuthData().getCommandData(); } -return Commands.newConnect(authentication.getAuthMethodName(), authData, -getPulsarClientVersion(), proxyToTargetBrokerAddress); +return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, +getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null); } @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 16e47a0..31dcac1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -1116,7 +1116,6 @@ public class Commands { return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload); } -@VisibleForTesting public static int getCurrentProtocolVersion() { // Return the last ProtocolVersion enum value return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 232db43..c7fa786 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -59,16 +59,19 @@ public class DirectProxyHandler { private String originalPrincipal; private String clientAuthData; private String clientAuthMethod; +private int protocolVersion; public static final String TLS_HANDLER = "tls"; private final Authentication authentication; -public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) { +public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, +
[GitHub] merlimat closed pull request #2849: fixing/adding sql docs to correct locations
merlimat closed pull request #2849: fixing/adding sql docs to correct locations URL: https://github.com/apache/pulsar/pull/2849 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md new file mode 100644 index 00..ac5814cf86 --- /dev/null +++ b/site2/docs/sql-deployment-configurations.md @@ -0,0 +1,152 @@ +--- +id: sql-deployment-configurations +title: Pulsar SQl Deployment and Configuration +sidebar_label: Deployment and Configuration +--- + +Below is a list configurations for the Presto Pulsar connector and instruction on how to deploy a cluster. + +## Presto Pulsar Connector Configurations +There are several configurations for the Presto Pulsar Connector. The properties file that contain these configurations can be found at ```${project.root}/conf/presto/catalog/pulsar.properties```. +The configurations for the connector and its default values are discribed below. + +```properties +# name of the connector to be displayed in the catalog +connector.name=pulsar + +# the url of Pulsar broker service +pulsar.broker-service-url=http://localhost:8080 + +# URI of Zookeeper cluster +pulsar.zookeeper-uri=localhost:2181 + +# minimum number of entries to read at a single time +pulsar.entry-read-batch-size=100 + +# default number of splits to use per query +pulsar.target-num-splits=4 +``` + +## Query Pulsar from Existing Presto Cluster + +If you already have an existing Presto cluster, you can copy Presto Pulsar connector plugin to your existing cluster. You can download the archived plugin package via: + +```bash +$ wget pulsar:binary_release_url +``` + +## Deploying a new cluster + +Please note that the [Getting Started](sql-getting-started.md) guide shows you how to easily setup a standalone single node enviroment to experiment with. + +Pulsar SQL is powered by [Presto](https://prestodb.io) thus many of the configurations for deployment is the same for the Pulsar SQL worker. + +You can use the same CLI args as the Presto launcher: + +```bash +$ ./bin/pulsar sql-worker --help +Usage: launcher [options] command + +Commands: run, start, stop, restart, kill, status + +Options: + -h, --helpshow this help message and exit + -v, --verbose Run verbosely + --etc-dir=DIR Defaults to INSTALL_PATH/etc + --launcher-config=FILE +Defaults to INSTALL_PATH/bin/launcher.properties + --node-config=FILEDefaults to ETC_DIR/node.properties + --jvm-config=FILE Defaults to ETC_DIR/jvm.config + --config=FILE Defaults to ETC_DIR/config.properties + --log-levels-file=FILE +Defaults to ETC_DIR/log.properties + --data-dir=DIRDefaults to INSTALL_PATH + --pid-file=FILE Defaults to DATA_DIR/var/run/launcher.pid + --launcher-log-file=FILE +Defaults to DATA_DIR/var/log/launcher.log (only in +daemon mode) + --server-log-file=FILE +Defaults to DATA_DIR/var/log/server.log (only in +daemon mode) + -D NAME=VALUE Set a Java system property + +``` + +There is a set of default configs for the cluster located in ```${project.root}/conf/presto``` that will be used by default. You can change them to customize your deployment + +You can also set the worker to read from a different configuration directory as well as set a different directory for writing its data: + +```bash +$ ./bin/pulsar sql-worker run --etc-dir /tmp/incubator-pulsar/conf/presto --data-dir /tmp/presto-1 +``` + +You can also start the worker as daemon process: + +```bash +$ ./bin sql-worker start +``` + +### Deploying to a 3 node cluster + +For example, if I wanted to deploy a Pulsar SQL/Presto cluster on 3 nodes, you can do the following: + +First, copy the Pulsar binary distribution to all three nodes. + +The first node, will run the Presto coordinator. The mininal configuration in ```${project.root}/conf/presto/config.properties``` can be the following + +```properties +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +query.max-memory=50GB +query.max-memory-per-node=1GB +discovery-server.enabled=true +discovery.uri= +``` + +Also, modify ```pulsar.broker-service-url``` and ```pulsar.zookeeper-uri``` configs in ```${project.root}/conf/presto/catalog/pulsar.properties``` on those nodes accordingly + +Afterwards, you can start the coordinator by just running + +```$ ./bin/pulsar sql-worker run``` + +For the other two nodes that will only serve as worker nodes, the configurations can be the following: + +```properties +coordinator=false +http-server.http.port=8080 +query.max-memory=50GB
[GitHub] merlimat closed pull request #2845: Make proxy advertise protocol version of client to broker
merlimat closed pull request #2845: Make proxy advertise protocol version of client to broker URL: https://github.com/apache/pulsar/pull/2845 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 9306a8248b..ccc20b3ec3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -110,6 +110,7 @@ private volatile int numberOfRejectRequests = 0; private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; +private final int protocolVersion; private final long operationTimeoutMs; protected String proxyToTargetBrokerAddress = null; @@ -123,6 +124,10 @@ } public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { +this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion()); +} + +public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) { super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest()); this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true); @@ -135,6 +140,7 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { this.state = State.None; this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable(); this.hostnameVerifier = new DefaultHostnameVerifier(); +this.protocolVersion = protocolVersion; } @Override @@ -167,8 +173,8 @@ protected ByteBuf newConnectCommand() throws PulsarClientException { if (authentication.getAuthData().hasDataFromCommand()) { authData = authentication.getAuthData().getCommandData(); } -return Commands.newConnect(authentication.getAuthMethodName(), authData, -getPulsarClientVersion(), proxyToTargetBrokerAddress); +return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, +getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null); } @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 16e47a0148..31dcac190e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -1116,7 +1116,6 @@ private static ByteBufPair serializeCommandMessageWithSize(BaseCommand cmd, Byte return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload); } -@VisibleForTesting public static int getCurrentProtocolVersion() { // Return the last ProtocolVersion enum value return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 232db4307e..c7fa786123 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -59,16 +59,19 @@ private String originalPrincipal; private String clientAuthData; private String clientAuthMethod; +private int protocolVersion; public static final String TLS_HANDLER = "tls"; private final Authentication authentication; -public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) { +public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, +int protocolVersion) { this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); this.originalPrincipal = proxyConnection.clientAuthRole; this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; +this.protocolVersion = protocolVersion; ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -97,7 +100,7 @@ protected void initChannel(SocketChannel ch) throws Exception { }
[pulsar] branch master updated: fixing/adding sql docs to correct locations (#2849)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b797d7b fixing/adding sql docs to correct locations (#2849) b797d7b is described below commit b797d7b9b416d21cff45e3b8f236edc18ac3a97e Author: Boyang Jerry Peng AuthorDate: Thu Oct 25 23:34:39 2018 -0700 fixing/adding sql docs to correct locations (#2849) --- site2/docs/sql-deployment-configurations.md| 152 site2/docs/sql-getting-started.md | 142 +++ site2/docs/sql-overview.md | 24 site2/website/sidebars.json| 5 + .../version-2.2.0/sql-deployment-configurations.md | 153 + .../version-2.2.0/sql-getting-started.md | 143 +++ .../versioned_docs/version-2.2.0/sql-overview.md | 25 .../versioned_sidebars/version-2.2.0-sidebars.json | 121 8 files changed, 765 insertions(+) diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md new file mode 100644 index 000..ac5814c --- /dev/null +++ b/site2/docs/sql-deployment-configurations.md @@ -0,0 +1,152 @@ +--- +id: sql-deployment-configurations +title: Pulsar SQl Deployment and Configuration +sidebar_label: Deployment and Configuration +--- + +Below is a list configurations for the Presto Pulsar connector and instruction on how to deploy a cluster. + +## Presto Pulsar Connector Configurations +There are several configurations for the Presto Pulsar Connector. The properties file that contain these configurations can be found at ```${project.root}/conf/presto/catalog/pulsar.properties```. +The configurations for the connector and its default values are discribed below. + +```properties +# name of the connector to be displayed in the catalog +connector.name=pulsar + +# the url of Pulsar broker service +pulsar.broker-service-url=http://localhost:8080 + +# URI of Zookeeper cluster +pulsar.zookeeper-uri=localhost:2181 + +# minimum number of entries to read at a single time +pulsar.entry-read-batch-size=100 + +# default number of splits to use per query +pulsar.target-num-splits=4 +``` + +## Query Pulsar from Existing Presto Cluster + +If you already have an existing Presto cluster, you can copy Presto Pulsar connector plugin to your existing cluster. You can download the archived plugin package via: + +```bash +$ wget pulsar:binary_release_url +``` + +## Deploying a new cluster + +Please note that the [Getting Started](sql-getting-started.md) guide shows you how to easily setup a standalone single node enviroment to experiment with. + +Pulsar SQL is powered by [Presto](https://prestodb.io) thus many of the configurations for deployment is the same for the Pulsar SQL worker. + +You can use the same CLI args as the Presto launcher: + +```bash +$ ./bin/pulsar sql-worker --help +Usage: launcher [options] command + +Commands: run, start, stop, restart, kill, status + +Options: + -h, --helpshow this help message and exit + -v, --verbose Run verbosely + --etc-dir=DIR Defaults to INSTALL_PATH/etc + --launcher-config=FILE +Defaults to INSTALL_PATH/bin/launcher.properties + --node-config=FILEDefaults to ETC_DIR/node.properties + --jvm-config=FILE Defaults to ETC_DIR/jvm.config + --config=FILE Defaults to ETC_DIR/config.properties + --log-levels-file=FILE +Defaults to ETC_DIR/log.properties + --data-dir=DIRDefaults to INSTALL_PATH + --pid-file=FILE Defaults to DATA_DIR/var/run/launcher.pid + --launcher-log-file=FILE +Defaults to DATA_DIR/var/log/launcher.log (only in +daemon mode) + --server-log-file=FILE +Defaults to DATA_DIR/var/log/server.log (only in +daemon mode) + -D NAME=VALUE Set a Java system property + +``` + +There is a set of default configs for the cluster located in ```${project.root}/conf/presto``` that will be used by default. You can change them to customize your deployment + +You can also set the worker to read from a different configuration directory as well as set a different directory for writing its data: + +```bash +$ ./bin/pulsar sql-worker run --etc-dir /tmp/incubator-pulsar/conf/presto --data-dir /tmp/presto-1 +``` + +You can also start the worker as daemon process: + +```bash +$ ./bin sql-worker start +``` + +### Deploying to a 3 node cluster + +For example, if I wanted to deploy a Pulsar SQL/Presto cluster on 3 nodes, you can do the following: + +First, copy the Pulsar binary distribution to all three nodes. + +The first node, will run the Presto coordinator. The mininal configuration in
[pulsar] branch master updated: Add "dashboard" to list of modules so that pom version gets auto updated (#2838)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 519cbe9 Add "dashboard" to list of modules so that pom version gets auto updated (#2838) 519cbe9 is described below commit 519cbe905ef2f53cb51ac0cc136a912c2ef8a18f Author: Matteo Merli AuthorDate: Thu Oct 25 23:06:50 2018 -0700 Add "dashboard" to list of modules so that pom version gets auto updated (#2838) * Add "dashboard" to list of modules so that pom version gets auto updated * Fixed version * Fixed license exclusion for dashboard --- dashboard/pom.xml | 2 +- pom.xml | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dashboard/pom.xml b/dashboard/pom.xml index 8131fb4..f46ec05 100644 --- a/dashboard/pom.xml +++ b/dashboard/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.1.0-incubating-SNAPSHOT +2.3.0-SNAPSHOT ../docker 4.0.0 diff --git a/pom.xml b/pom.xml index cc8bc7d..eac83c0 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,7 @@ flexible messaging model and an intuitive client API. pulsar-zookeeper pulsar-log4j2-appender pulsar-sql +dashboard jclouds-shaded @@ -1035,7 +1036,7 @@ flexible messaging model and an intuitive client API. **/*.graffle **/*.hgrm **/CMakeFiles/** -dashboard/django/stats/migrations/*.py +**/django/stats/migrations/*.py site/vendor/** site/scripts/doxygen/** site/api/** @@ -1142,7 +1143,7 @@ flexible messaging model and an intuitive client API. **/META-INF/services/com.facebook.presto.spi.Plugin -dashboard/django/stats/migrations/*.py +**/django/stats/migrations/*.py **/conf/uwsgi_params
[GitHub] merlimat closed pull request #2838: Add "dashboard" to list of modules so that pom version gets auto updated
merlimat closed pull request #2838: Add "dashboard" to list of modules so that pom version gets auto updated URL: https://github.com/apache/pulsar/pull/2838 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/dashboard/pom.xml b/dashboard/pom.xml index 8131fb43eb..f46ec05165 100644 --- a/dashboard/pom.xml +++ b/dashboard/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.1.0-incubating-SNAPSHOT +2.3.0-SNAPSHOT ../docker 4.0.0 diff --git a/pom.xml b/pom.xml index cc8bc7d9a1..eac83c073f 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,7 @@ flexible messaging model and an intuitive client API. pulsar-zookeeper pulsar-log4j2-appender pulsar-sql +dashboard jclouds-shaded @@ -1035,7 +1036,7 @@ flexible messaging model and an intuitive client API. **/*.graffle **/*.hgrm **/CMakeFiles/** -dashboard/django/stats/migrations/*.py +**/django/stats/migrations/*.py site/vendor/** site/scripts/doxygen/** site/api/** @@ -1142,7 +1143,7 @@ flexible messaging model and an intuitive client API. **/META-INF/services/com.facebook.presto.spi.Plugin -dashboard/django/stats/migrations/*.py +**/django/stats/migrations/*.py **/conf/uwsgi_params This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2850: adding Python function instance unit test
jerrypeng commented on issue #2850: adding Python function instance unit test URL: https://github.com/apache/pulsar/pull/2850#issuecomment-433296762 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services