[GitHub] skyrocknroll opened a new pull request #2859: Fixed Typo
skyrocknroll opened a new pull request #2859: Fixed Typo URL: https://github.com/apache/pulsar/pull/2859 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] benjaminhuo commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?
benjaminhuo commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker? URL: https://github.com/apache/pulsar/issues/2858#issuecomment-433593105 Thanks for the explanation and suggestion. I'd like to contribute. Regards, Ben This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg commented on a change in pull request #2855: Secretprovider interfaces and some default implementations
cckellogg commented on a change in pull request #2855: Secretprovider interfaces and some default implementations URL: https://github.com/apache/pulsar/pull/2855#discussion_r228702112 ## File path: pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.secretsproviderconfigurator; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.models.V1Container; +import io.kubernetes.client.models.V1EnvVar; +import io.kubernetes.client.models.V1EnvVarSource; +import io.kubernetes.client.models.V1SecretKeySelector; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; + +import java.lang.reflect.Type; +import java.util.Map; + +/** + * This file defines the SecretsProviderConfigurator that will be used by default for running in Kubernetes. + * As such this implementation is strictly when workers are configured to use kubernetes runtime. + * We use kubernetes in built secrets and bind them as environment variables within the function container + * to ensure that the secrets are availble to the function at runtime. Then we plug in the + * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read these environment variables + */ +public class KubernetesSecretsProviderConfigurator implements SecretsProviderConfigurator { +@Override +public String getSecretsProviderClassName(Function.FunctionDetails functionDetails) { +switch (functionDetails.getRuntime()) { +case JAVA: +return EnvironmentBasedSecretsProvider.class.getName(); +case PYTHON: +return "secretsprovider.EnvironmentBasedSecretsProvider"; +default: +throw new RuntimeException("Unknown function runtime " + functionDetails.getRuntime()); +} +} + +@Override +public Map getSecretsProviderConfig(Function.FunctionDetails functionDetails) { +return null; +} + +@Override +public void configureKubernetesRuntimeSecretsProvider(V1Container container, Function.FunctionDetails functionDetails) { +if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { +Type type = new TypeToken>() { +}.getType(); +Map secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); +for (Map.Entry entry : secretsMap.entrySet()) { +final V1EnvVar secretEnv = new V1EnvVar(); +Map kv = (Map) entry.getValue(); +secretEnv.name(entry.getKey()) +.valueFrom(new V1EnvVarSource() +.secretKeyRef(new V1SecretKeySelector() + .name(kv.entrySet().iterator().next().getKey()) Review comment: should this look for specific keys in the Map? There should be some error checking? What if the first key is unexpected or unknown value? The provider should provide some validation of the secret object and not process it if it's not configured correctly. Can there be some unit tests with this too? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar adaptor for Apache Kafka
skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar adaptor for Apache Kafka URL: https://github.com/apache/pulsar/issues/2792#issuecomment-433592430 @ivankelly https://kafka.apache.org/documentation.html#newconsumerconfigs . What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): *earliest*: automatically reset the offset to the earliest offset *latest*: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer's group anything else: throw exception to the consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?
sijie commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker? URL: https://github.com/apache/pulsar/issues/2858#issuecomment-433592171 @benjaminhuo > Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker the helm chart was contributed from a production deployment in google cloud. so those values are set to pretty high. I think we can improve it by introducing 3 different groups of settings, e.g. small, medium and large. so people can choose the proper settings to deploy to their environment. I am wondering if you are interested in contributing a change to this helm chart :) > The manifest(bookie.yaml) deploy bookeeper as daemonset, why not statefulset with persistent volume claim? I think helm chart deploys bookkeeper as statefulset. https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/bookkeeper-statefulset.yaml You mean the kubernetes manifest here: https://github.com/apache/pulsar/blob/master/deployment/kubernetes/generic/bookie.yaml#L33 The reason why it is using daemonset for generic deployment is following: as a generic deployment, people might not enable any persistent volumes enabled in their k8s environment. so for they to tryout pulsar quickly, daemonset is an easy solution to start. In cloud environment, for example, aws and google cloud, we are recommending people deploying using statefulset. https://github.com/apache/pulsar/blob/master/deployment/kubernetes/aws/bookkeeper.yaml#L67 but since k8s is now supporting local volume claim, I think we can change generic deployment to use statefulset with local volume claim. wondering if you are interested in contributing to this as well? Hope this answer your question? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2814: Fix memory issue in cpp ZTSClient
sijie closed pull request #2814: Fix memory issue in cpp ZTSClient URL: https://github.com/apache/pulsar/pull/2814 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc index 3097c30c65..94671241e2 100644 --- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc +++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc @@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char *input, int length) { } char *ZTSClient::base64Decode(const char *input) { -BIO *bio, *b64; +if (input == NULL) { +return NULL; +} + size_t length = strlen(input); +if (length == 0) { +return NULL; +} + +BIO *bio, *b64; char *result = (char *)malloc(length); bio = BIO_new_mem_buf((void *)input, -1); @@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) { bio = BIO_push(b64, bio); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); -BIO_read(bio, result, length); +int decodeStrLen = BIO_read(bio, result, length); BIO_free_all(bio); +if (decodeStrLen > 0) { +result[decodeStrLen] = '\0'; +return result; +} +free(result); -return result; +return NULL; } const std::string ZTSClient::getPrincipalToken() const { // construct unsigned principal token std::string unsignedTokenString = "v=S1"; -char host[BUFSIZ]; +char host[BUFSIZ] = {}; long long t = (long long)time(NULL); gethostname(host, sizeof(host)); @@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const { // signing const char *unsignedToken = unsignedTokenString.c_str(); -unsigned char signature[BUFSIZ]; -unsigned char hash[SHA256_DIGEST_LENGTH]; +unsigned char signature[BUFSIZ] = {}; +unsigned char hash[SHA256_DIGEST_LENGTH] = {}; unsigned int siglen; FILE *fp; RSA *privateKey; @@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const { } char *decodeStr = base64Decode(privateKeyUri_.data.c_str()); +if (decodeStr == NULL) { +LOG_ERROR("Failed to decode privateKey"); +return ""; +} + BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); if (bio == NULL) { LOG_ERROR("Failed to create key BIO"); +free(decodeStr); return ""; } privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL); BIO_free(bio); +free(decodeStr); if (privateKey == NULL) { LOG_ERROR("Failed to load privateKey"); return ""; @@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const { std::string principalToken = unsignedTokenString + ";s=" + ybase64Encode(signature, siglen); LOG_DEBUG("Created signed principal token: " << principalToken); +RSA_free(privateKey); + return principalToken; } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Fix memory issue in cpp ZTSClient (#2814)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c518247 Fix memory issue in cpp ZTSClient (#2814) c518247 is described below commit c518247d873bfe07364359e5f63f5fd7f689c0e9 Author: hrsakai AuthorDate: Sat Oct 27 14:09:27 2018 +0900 Fix memory issue in cpp ZTSClient (#2814) ### Modifications * Use `calloc` instead of `malloc` in order to add Termination character(`\0`). * Free memory allocated with `calloc` and `PEM_read_bio_RSAPrivateKey`. --- pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 34 +- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc index 3097c30..9467124 100644 --- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc +++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc @@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char *input, int length) { } char *ZTSClient::base64Decode(const char *input) { -BIO *bio, *b64; +if (input == NULL) { +return NULL; +} + size_t length = strlen(input); +if (length == 0) { +return NULL; +} + +BIO *bio, *b64; char *result = (char *)malloc(length); bio = BIO_new_mem_buf((void *)input, -1); @@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) { bio = BIO_push(b64, bio); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); -BIO_read(bio, result, length); +int decodeStrLen = BIO_read(bio, result, length); BIO_free_all(bio); +if (decodeStrLen > 0) { +result[decodeStrLen] = '\0'; +return result; +} +free(result); -return result; +return NULL; } const std::string ZTSClient::getPrincipalToken() const { // construct unsigned principal token std::string unsignedTokenString = "v=S1"; -char host[BUFSIZ]; +char host[BUFSIZ] = {}; long long t = (long long)time(NULL); gethostname(host, sizeof(host)); @@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const { // signing const char *unsignedToken = unsignedTokenString.c_str(); -unsigned char signature[BUFSIZ]; -unsigned char hash[SHA256_DIGEST_LENGTH]; +unsigned char signature[BUFSIZ] = {}; +unsigned char hash[SHA256_DIGEST_LENGTH] = {}; unsigned int siglen; FILE *fp; RSA *privateKey; @@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const { } char *decodeStr = base64Decode(privateKeyUri_.data.c_str()); +if (decodeStr == NULL) { +LOG_ERROR("Failed to decode privateKey"); +return ""; +} + BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1); BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); if (bio == NULL) { LOG_ERROR("Failed to create key BIO"); +free(decodeStr); return ""; } privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL); BIO_free(bio); +free(decodeStr); if (privateKey == NULL) { LOG_ERROR("Failed to load privateKey"); return ""; @@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const { std::string principalToken = unsignedTokenString + ";s=" + ybase64Encode(signature, siglen); LOG_DEBUG("Created signed principal token: " << principalToken); +RSA_free(privateKey); + return principalToken; }
[pulsar] branch master updated: Added ability for the kubernetes to poll a configmap to look out for changes (#2856)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2d208c5 Added ability for the kubernetes to poll a configmap to look out for changes (#2856) 2d208c5 is described below commit 2d208c565e3bd25a1cc12d904f72b91f96ec6cab Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 22:07:54 2018 -0700 Added ability for the kubernetes to poll a configmap to look out for changes (#2856) ### Motivation While running functions worker in kubernetes, it might be hard to update any kind of config of the worker without killing it. This pr allows certain properties of the functions to be stored in a configmap. The KubernetesRuntime Factory will keep an eye for any changes and apply the changes on the fly dramatically simplifying any config updates --- .../runtime/KubernetesRuntimeFactory.java | 126 +++-- .../functions/runtime/KubernetesRuntimeTest.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 5 + 4 files changed, 101 insertions(+), 38 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 1a180ae..c6d5d02 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -24,13 +24,20 @@ import io.kubernetes.client.ApiClient; import io.kubernetes.client.Configuration; import io.kubernetes.client.apis.AppsV1Api; import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1ConfigMap; import io.kubernetes.client.util.Config; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import java.lang.reflect.Field; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -40,24 +47,33 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; @Slf4j public class KubernetesRuntimeFactory implements RuntimeFactory { -private final String k8Uri; -private final String jobNamespace; -private final String pulsarDockerImageName; -private final String pulsarRootDir; +@Getter +@Setter +@NoArgsConstructor +class KubernetesInfo { +private String k8Uri; +private String jobNamespace; +private String pulsarDockerImageName; +private String pulsarRootDir; +private String pulsarAdminUrl; +private String pulsarServiceUrl; +private String pythonDependencyRepository; +private String pythonExtraDependencyRepository; +private String changeConfigMap; +private String changeConfigMapNamespace; +} +private final KubernetesInfo kubernetesInfo; private final Boolean submittingInsidePod; private final Boolean installUserCodeDependencies; -private final String pythonDependencyRepository; -private final String pythonExtraDependencyRepository; private final Map customLabels; -private final String pulsarAdminUri; -private final String pulsarServiceUri; +private final Integer expectedMetricsCollectionInterval; private final String stateStorageServiceUri; private final AuthenticationConfig authConfig; private final String javaInstanceJarFile; private final String pythonInstanceFile; private final String prometheusMetricsServerJarFile; private final String logDirectory = "logs/functions"; -private final Integer expectedMetricsInterval; +private Timer changeConfigMapTimer; private AppsV1Api appsClient; private CoreV1Api coreClient; @@ -75,36 +91,41 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String pulsarAdminUri, String stateStorageServiceUri, AuthenticationConfig authConfig, -Integer expectedMetricsInterval) { -this.k8Uri = k8Uri; +Integer expectedMetricsCollectionInterval, +String changeConfigMap, +String changeConfigMapNamespace) { +this.kubernetesInfo = new KubernetesInfo(); +this.kubernetesInfo.setK8Uri(k8Uri); if (
[GitHub] sijie closed pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes
sijie closed pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes URL: https://github.com/apache/pulsar/pull/2856 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 1a180aee0c..c6d5d02645 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -24,13 +24,20 @@ import io.kubernetes.client.Configuration; import io.kubernetes.client.apis.AppsV1Api; import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1ConfigMap; import io.kubernetes.client.util.Config; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import java.lang.reflect.Field; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -40,24 +47,33 @@ @Slf4j public class KubernetesRuntimeFactory implements RuntimeFactory { -private final String k8Uri; -private final String jobNamespace; -private final String pulsarDockerImageName; -private final String pulsarRootDir; +@Getter +@Setter +@NoArgsConstructor +class KubernetesInfo { +private String k8Uri; +private String jobNamespace; +private String pulsarDockerImageName; +private String pulsarRootDir; +private String pulsarAdminUrl; +private String pulsarServiceUrl; +private String pythonDependencyRepository; +private String pythonExtraDependencyRepository; +private String changeConfigMap; +private String changeConfigMapNamespace; +} +private final KubernetesInfo kubernetesInfo; private final Boolean submittingInsidePod; private final Boolean installUserCodeDependencies; -private final String pythonDependencyRepository; -private final String pythonExtraDependencyRepository; private final Map customLabels; -private final String pulsarAdminUri; -private final String pulsarServiceUri; +private final Integer expectedMetricsCollectionInterval; private final String stateStorageServiceUri; private final AuthenticationConfig authConfig; private final String javaInstanceJarFile; private final String pythonInstanceFile; private final String prometheusMetricsServerJarFile; private final String logDirectory = "logs/functions"; -private final Integer expectedMetricsInterval; +private Timer changeConfigMapTimer; private AppsV1Api appsClient; private CoreV1Api coreClient; @@ -75,36 +91,41 @@ public KubernetesRuntimeFactory(String k8Uri, String pulsarAdminUri, String stateStorageServiceUri, AuthenticationConfig authConfig, -Integer expectedMetricsInterval) { -this.k8Uri = k8Uri; +Integer expectedMetricsCollectionInterval, +String changeConfigMap, +String changeConfigMapNamespace) { +this.kubernetesInfo = new KubernetesInfo(); +this.kubernetesInfo.setK8Uri(k8Uri); if (!isEmpty(jobNamespace)) { -this.jobNamespace = jobNamespace; +this.kubernetesInfo.setJobNamespace(jobNamespace); } else { -this.jobNamespace = "default"; +this.kubernetesInfo.setJobNamespace("default"); } if (!isEmpty(pulsarDockerImageName)) { -this.pulsarDockerImageName = pulsarDockerImageName; + this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName); } else { -this.pulsarDockerImageName = "apachepulsar/pulsar"; + this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar"); } if (!isEmpty(pulsarRootDir)) { -this.pulsarRootDir = pulsarRootDir; +this.kubernetesInfo.setPulsarRootDir(pulsarRootDir); } else { -this.pulsarRootDir = "/pulsar"; +this.kubernetesInfo.setPulsarRootDir("/pulsar"); } + this.kuberne
[pulsar] branch master updated: Reduce memory reserved for prometheus (#2857)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new eb747c4 Reduce memory reserved for prometheus (#2857) eb747c4 is described below commit eb747c48e44ae7861771565c8e46f8ed6c8d6e48 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 22:06:42 2018 -0700 Reduce memory reserved for prometheus (#2857) --- .../java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 80eb840..7f3f72d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -66,7 +66,7 @@ class KubernetesRuntime implements Runtime { private static final Integer GRPC_PORT = 9093; private static final Integer PROMETHEUS_PORT = 9094; private static final Double prometheusMetricsServerCpu = 0.1; -private static final Long prometheusMetricsServerRam = 25000l; +private static final Long prometheusMetricsServerRam = 12500l; public static final Pattern VALID_POD_NAME_REGEX = Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", Pattern.CASE_INSENSITIVE);
[GitHub] sijie closed pull request #2857: Reduce memory reserved for prometheus
sijie closed pull request #2857: Reduce memory reserved for prometheus URL: https://github.com/apache/pulsar/pull/2857 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 80eb8400af..7f3f72d44b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -66,7 +66,7 @@ private static final Integer GRPC_PORT = 9093; private static final Integer PROMETHEUS_PORT = 9094; private static final Double prometheusMetricsServerCpu = 0.1; -private static final Long prometheusMetricsServerRam = 25000l; +private static final Long prometheusMetricsServerRam = 12500l; public static final Pattern VALID_POD_NAME_REGEX = Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", Pattern.CASE_INSENSITIVE); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] benjaminhuo opened a new issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?
benjaminhuo opened a new issue #2858: What official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker? URL: https://github.com/apache/pulsar/issues/2858 Expected behavior The memory should be set to a reasonable size Actual behavior Tell us what happens instead Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and broker ![snip20181027_103](https://user-images.githubusercontent.com/18525465/47598862-1cd85080-d9d5-11e8-8201-451d6c377486.png) ![snip20181027_104](https://user-images.githubusercontent.com/18525465/47598863-1cd85080-d9d5-11e8-8728-46d4cb3b6d42.png) ![snip20181027_107](https://user-images.githubusercontent.com/18525465/47598864-1cd85080-d9d5-11e8-89c3-8d8b43343c2b.png) ![snip20181027_108](https://user-images.githubusercontent.com/18525465/47598865-1d70e700-d9d5-11e8-8c26-0f718f609173.png) Steps to reproduce System configuration **Pulsar version**: x.y This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus
srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus URL: https://github.com/apache/pulsar/pull/2857 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes
srkukarni opened a new pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes URL: https://github.com/apache/pulsar/pull/2856 ### Motivation While running functions worker in kubernetes, it might be hard to update any kind of config of the worker without killing it. This pr allows certain properties of the functions to be stored in a configmap. The KubernetesRuntime Factory will keep an eye for any changes and apply the changes on the fly dramatically simplifying any config updates ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2855: Secretprovider interfaces and some default implementations
srkukarni opened a new pull request #2855: Secretprovider interfaces and some default implementations URL: https://github.com/apache/pulsar/pull/2855 ### Motivation This pr defines the SecretsProviderConfigurator that configures the function instances/containers and plugs in the right SecretsProvider for the instances to use. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2855: Secretprovider interfaces and some default implementations
srkukarni commented on issue #2855: Secretprovider interfaces and some default implementations URL: https://github.com/apache/pulsar/pull/2855#issuecomment-433562824 @cckellogg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Secrets Frontend (#2853)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 82b267b Secrets Frontend (#2853) 82b267b is described below commit 82b267ba9d59fa677d5b4fdcc48e38de6c59dee9 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 14:49:44 2018 -0700 Secrets Frontend (#2853) * Secrets Frontend * Do not expose secrets in cli --- .../pulsar/common/functions/FunctionConfig.java| 5 ++ .../org/apache/pulsar/common/io/SinkConfig.java| 5 ++ .../org/apache/pulsar/common/io/SourceConfig.java | 5 ++ .../instance/src/main/python/Function_pb2.py | 76 -- .../proto/src/main/proto/Function.proto| 1 + .../functions/utils/FunctionConfigUtils.java | 10 +++ .../pulsar/functions/utils/SinkConfigUtils.java| 8 +++ .../pulsar/functions/utils/SourceConfigUtils.java | 9 +++ 8 files changed, 85 insertions(+), 34 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index ea5866a..2163df1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -79,6 +79,11 @@ public class FunctionConfig { private ProcessingGuarantees processingGuarantees; private boolean retainOrdering; private Map userConfig; +// This is a map of secretName(aka how the secret is going to be +// accessed in the function via context) to an object that +// encapsulates how the secret is fetched by the underlying +// secrets provider +private Map secrets; private Runtime runtime; private boolean autoAck; private int maxMessageRetries = -1; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index c4bf1ca..355c696 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -55,6 +55,11 @@ public class SinkConfig { private Map inputSpecs = new TreeMap<>(); private Map configs; +// This is a map of secretName(aka how the secret is going to be +// accessed in the function via context) to an object that +// encapsulates how the secret is fetched by the underlying +// secrets provider +private Map secrets; private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; private boolean retainOrdering; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 57c0f79..9dbe97c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -46,6 +46,11 @@ public class SourceConfig { private String schemaType; private Map configs; +// This is a map of secretName(aka how the secret is going to be +// accessed in the function via context) to an object that +// encapsulates how the secret is fetched by the underlying +// secrets provider +private Map secrets; private int parallelism = 1; private FunctionConfig.ProcessingGuarantees processingGuarantees; private Resources resources; diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index b09e105..d6cc8af 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -18,6 +18,7 @@ # # Generated by the protocol buffer compiler. DO NOT EDIT! +# Generated by the protocol buffer compiler. DO NOT EDIT! # source: Function.proto import sys @@ -39,7 +40,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xd4\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433541994 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2852: Add connection timeout client configuration option
merlimat commented on issue #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433535510 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client
merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client URL: https://github.com/apache/pulsar/pull/2854 ### Motivation The ZLib method `uncompress()` is not able to process data that was compressed from Pulsar Java producer with ZLib. This is due to the the initialization settings that are used inside the `uncompress()` functions and that are different from how ZLib is used within the JDK. This causes C++ (and derivatives) consumers to drop messages that were compressed by Java producer. A Java consumer is able to correctly decompress both. ### Modifications * Fixed decompression in C++ to be able to inflate both zstream terminations * Added option in C++ perf producer to compress messages This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433526803 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515376 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515089 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04622..7f1b5aa45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public void testProxyStats() throws Exception { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; -System.out.println(consumerUri+", "+producerUri); +System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public void testProxyStats() throws Exception { } } +@Test(timeOut = 1) +public void consumeMessagesInPartitionedTopicTest() throws Exception { +final String namespace = "my-property/my-ns"; +final String topic = namespace + "/" + "my-topic7"; +admin.topics().createPartitionedTopic("persistent://" + topic, 3); + +final String subscription = "my-sub"; +final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; +final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + +URI consumeUri = URI.create(consumerUri); +URI produceUri = URI.create(producerUri); + +WebSocketClient consumeClient = new WebSocketClient(); +WebSocketClient produceClient = new WebSocketClient(); + +SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); +SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + +try { +produceClient.start(); +ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); +Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); +producerFuture.get(); +produceSocket.sendMessage(100); +} finally { +stopWebSocketClient(produceClient); +} + +Thread.sleep(500); + +try { +consumeClient.start(); +ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); +Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); +consumerFuture.get(); +} finally { +stopWebSocketClient(consumeClient); +} +} + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e086ed..4d38aa949d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return MessageIdImpl.fromByteArray(data); } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +return MessageIdImpl.fromByteArrayWithTopic(data, topicName); +} + public static final MessageId earliest = new MessageIdImpl(-1, -1, -1); public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 3686b124a8..5a53436e7b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.MessageId; imp
[GitHub] merlimat commented on issue #1070: Provide a nodejs client
merlimat commented on issue #1070: Provide a nodejs client URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433491319 @mujimu There are indeed plans to start soon working on this. Thanks for the link. If you have JS experience it would be also good to have feedback on API ergonomics once we start working on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Fix Websocket Consume Messages in Partitioned Topics (#2829)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d79499d Fix Websocket Consume Messages in Partitioned Topics (#2829) d79499d is described below commit d79499dbe24fbd8b065489875ba86a47761ec33f Author: Yuto Furuta AuthorDate: Sat Oct 27 02:53:09 2018 +0900 Fix Websocket Consume Messages in Partitioned Topics (#2829) * fix consume messages in partitioned topics on websocket * add consumeMessagesInPartitionedTopicTest * add fromByteArrayWithTopic * remove public --- .../websocket/proxy/ProxyPublishConsumeTest.java | 43 +- .../org/apache/pulsar/client/api/MessageId.java| 5 +++ .../apache/pulsar/client/impl/MessageIdImpl.java | 31 .../apache/pulsar/websocket/ConsumerHandler.java | 3 +- 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04..7f1b5aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; -System.out.println(consumerUri+", "+producerUri); +System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } +@Test(timeOut = 1) +public void consumeMessagesInPartitionedTopicTest() throws Exception { +final String namespace = "my-property/my-ns"; +final String topic = namespace + "/" + "my-topic7"; +admin.topics().createPartitionedTopic("persistent://" + topic, 3); + +final String subscription = "my-sub"; +final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; +final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + +URI consumeUri = URI.create(consumerUri); +URI produceUri = URI.create(producerUri); + +WebSocketClient consumeClient = new WebSocketClient(); +WebSocketClient produceClient = new WebSocketClient(); + +SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); +SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + +try { +produceClient.start(); +ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); +Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); +producerFuture.get(); +produceSocket.sendMessage(100); +} finally { +stopWebSocketClient(produceClient); +} + +Thread.sleep(500); + +try { +consumeClient.start(); +ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); +Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); +consumerFuture.get(); +} finally { +stopWebSocketClient(consumeClient); +} +} + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e08..4d38aa9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public interface MessageId extends Comparable, Serializable { return MessageIdImpl.fromByteArray(data); } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +return MessageIdImpl.
[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface
jerrypeng commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228607992 ## File path: pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.secretsproviderconfigurator; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.models.V1Container; +import io.kubernetes.client.models.V1EnvVar; +import io.kubernetes.client.models.V1EnvVarSource; +import io.kubernetes.client.models.V1SecretKeySelector; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; + +import java.lang.reflect.Type; +import java.util.Map; + +/** + * Context provides contextual information to the executing function. + * Features like which message id we are handling, whats the topic name of the + * message, what are our operating constraints, etc can be accessed by the + * executing function + */ +public class KubernetesSecretsProviderConfigurator implements SecretsProviderConfigurator { +@Override +public void init(Map config) { + +} + +@Override +public String getSecretsProviderClassName(Function.FunctionDetails functionDetails) { +switch (functionDetails.getRuntime()) { +case JAVA: +return EnvironmentBasedSecretsProvider.class.getName(); +case PYTHON: +return "secretsprovider.EnvironmentBasedSecretsProvider"; +default: +throw new RuntimeException("Unknown function runtime " + functionDetails.getRuntime()); +} +} + +@Override +public Map getSecretsProviderConfig(Function.FunctionDetails functionDetails) { +return null; +} + +@Override +public void configureKubernetesRuntimeSecretsProvider(V1Container container, Function.FunctionDetails functionDetails) { +if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { +Type type = new TypeToken>() { +}.getType(); +Map secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); Review comment: Perhaps its worthwhile to explicitly define the structure of the "Object" for kubernetes so it is clear what configs should be there. You can just probably declare a class that represents the Object This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface
jerrypeng commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228606081 ## File path: pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java ## @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.secretsprovider; + +import java.util.Map; + +/** + * Context provides contextual information to the executing function. Review comment: I don't think this is the right comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: adding Python function instance unit test (#2850)
This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1a1cf22 adding Python function instance unit test (#2850) 1a1cf22 is described below commit 1a1cf226a718fbaece393e8e1ed9584b59a8752b Author: Boyang Jerry Peng AuthorDate: Fri Oct 26 10:14:01 2018 -0700 adding Python function instance unit test (#2850) * adding python function instance unittests * adding license headers --- bin/pulsar | 2 +- bin/pulsar-admin | 2 +- distribution/server/src/assemble/bin.xml | 2 +- pulsar-client-cpp/run-unit-tests.sh| 4 ++ pulsar-functions/instance/pom.xml | 31 .../src/scripts/run_python_instance_tests.sh | 30 +++ .../src/test/python/test_python_instance.py| 58 ++ pulsar-functions/runtime/pom.xml | 28 --- 8 files changed, 126 insertions(+), 31 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index 9cacfe7..02a81a0 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -94,7 +94,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then echo "\nCouldn't find pulsar-functions python instance."; echo "Make sure you've run 'mvn package'\n"; diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 1a1339d..39d1239 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -130,7 +130,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} else diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index c9ad893..d17d4ff 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -44,7 +44,7 @@ ${basedir}/licenses - ${basedir}/../../pulsar-functions/runtime/target/python-instance + ${basedir}/../../pulsar-functions/instance/target/python-instance instances/python-instance diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index ed4b6b4..cffe002 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then python pulsar_test.py RES=$? +echo " Running Python Function Instance unit tests" +bash /pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +RES=$? + popd popd diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index e206b11..1c8f3ab 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -103,4 +103,35 @@ + + + + +org.apache.maven.plugins +maven-antrun-plugin + + +compile + + run + + + +building python instance + + + + + + + + + + + + + + diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh new file mode 100644 index 000..9b33c24 --- /dev/null +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on
[pulsar] branch master updated: Cleanup Logging for python functions (#2847)
This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fa212bc Cleanup Logging for python functions (#2847) fa212bc is described below commit fa212bc8bb0cbd81729b498b6041cde794be9684 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 10:13:39 2018 -0700 Cleanup Logging for python functions (#2847) * Cleanup Logging for python functions * More statements to debug * More debug statements * More debug --- .../instance/src/main/python/python_instance.py| 8 ++--- .../instance/src/main/python/server.py | 8 ++--- pulsar-functions/instance/src/main/python/util.py | 37 ++ 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafae..5f1645d 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ class PythonInstance(object): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): -Log.info("Started Thread for executing the function") +Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ class PythonInstance(object): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737..58d43d2 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): -Log.info("Came in GetFunctionStatus") +Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): -Log.info("Came in GetAndResetMetrics") +Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): -Log.info("Came in ResetMetrics") +Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): -Log.info("Came in GetMetrics") +Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457..56c1ce1 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions' def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: +re
[GitHub] jerrypeng closed pull request #2850: adding Python function instance unit test
jerrypeng closed pull request #2850: adding Python function instance unit test URL: https://github.com/apache/pulsar/pull/2850 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/pulsar b/bin/pulsar index 9cacfe7782..02a81a04ef 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -94,7 +94,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then echo "\nCouldn't find pulsar-functions python instance."; echo "Make sure you've run 'mvn package'\n"; diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 1a1339d8ab..39d12393b2 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -130,7 +130,7 @@ fi # find the python instance location if [ ! -f "${PY_INSTANCE_FILE}" ]; then # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py" + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} else diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index c9ad893d56..d17d4ff39f 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -44,7 +44,7 @@ ${basedir}/licenses - ${basedir}/../../pulsar-functions/runtime/target/python-instance + ${basedir}/../../pulsar-functions/instance/target/python-instance instances/python-instance diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index ed4b6b4a1c..cffe0023b7 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then python pulsar_test.py RES=$? +echo " Running Python Function Instance unit tests" +bash /pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +RES=$? + popd popd diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml index e206b111cb..1c8f3ab78e 100644 --- a/pulsar-functions/instance/pom.xml +++ b/pulsar-functions/instance/pom.xml @@ -103,4 +103,35 @@ + + + + +org.apache.maven.plugins +maven-antrun-plugin + + +compile + + run + + + +building python instance + + + + + + + + + + + + + + diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh new file mode 100644 index 00..9b33c24481 --- /dev/null +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Make sure dependencies are installed +pip install mock --user +pip install protobuf --user + +CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +PULSAR_HOME=$CUR_DIR/../../../../ + +# run instance tests +PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests \ No newline at end of file diff --git a/pulsar-functions/instance/src/test/python/test_python_instanc
[GitHub] jerrypeng closed pull request #2847: Cleanup Logging for python functions
jerrypeng closed pull request #2847: Cleanup Logging for python functions URL: https://github.com/apache/pulsar/pull/2847 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafaef2f..5f1645dd74 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ def run(self): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): -Log.info("Started Thread for executing the function") +Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ def setup_output_serde(self): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737031..58d43d204d 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ def __init__(self, pyinstance): self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): -Log.info("Came in GetFunctionStatus") +Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): -Log.info("Came in GetAndResetMetrics") +Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): -Log.info("Came in ResetMetrics") +Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): -Log.info("Came in GetMetrics") +Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457bbc..56c1ce1c9f 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: +return import_class_from_path(from_path, full_class_name) + except Exception as e: our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe( api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT) -kclass = import_class_from_path(api_dir, full_class_name) - return kclass +try: + return import_class_from_path(api_dir, full_class_name) +except Exception as e: + Log.info("Failed to import class %s from path %s" % (full_class_name, from_path)) + Log.info(e, exc_info=True) + return None def import_class_from_path(from_path, full_
[GitHub] srkukarni opened a new pull request #2853: Secrets Frontend
srkukarni opened a new pull request #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853 ### Motivation This pr defines the FunctionConfig part of secrets implementation. It defines how users can define secrets in their function/source/sink configs and how they get translated to FunctionDetails. Follow up prs to actually implement secrets will follow this one. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2853: Secrets Frontend
srkukarni commented on issue #2853: Secrets Frontend URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433473480 @cckellogg Please take a look. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2826: Secrets Interface
srkukarni commented on issue #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433470126 I will break the change into smaller changes and it might be best to comment on each individual pr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mujimu commented on issue #1070: Provide a nodejs client
mujimu commented on issue #1070: Provide a nodejs client URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433457173 more recent example of wrapping an underlying C++ lib as a node library: https://github.com/Blizzard/node-rdkafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mujimu commented on issue #1070: Provide a nodejs client
mujimu commented on issue #1070: Provide a nodejs client URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433455906 bump This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface
cckellogg commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228576463 ## File path: pulsar-functions/instance/src/main/python/secretsprovider.py ## @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# -*- encoding: utf-8 -*- + +"""secretsprovider.py: Interfaces and definitions for Secret Providers +""" +from abc import abstractmethod +import os + +class SecretsProvider: Review comment: I think this ok now i think about it more. More implementations can be added later This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface
cckellogg commented on a change in pull request #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#discussion_r228569533 ## File path: pulsar-functions/instance/src/main/python/secretsprovider.py ## @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# -*- encoding: utf-8 -*- + +"""secretsprovider.py: Interfaces and definitions for Secret Providers +""" +from abc import abstractmethod +import os + +class SecretsProvider: Review comment: I think the implementation for these should be provider specific. Like KuberberneteSecretProvider and that provider know how to read the secret in the container. Same for java. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating
rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating URL: https://github.com/apache/pulsar/issues/2795#issuecomment-432384264 I though this was a bug but it turns out that it was an issue with how to enable authorization. I can't remember exactly where but the documentation was a bit confusing on which properties to enable for authentication vs. authorization. You will get this error if the following property is not set in your broker.conf files ```sh # Configuration to enable Authorization authorizationEnabled=true ``` Once you bounce the broker with this update, the broker log file should contain a line similar to: ```sh [main] INFO org.apache.pulsar.broker.authorization.AuthorizationService - org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider has been loaded. ``` Then you can run your command again and it should succeed: ```sh ./pulsar-admin namespaces grant-permission public/pulsar-gke/default --actions produce,consume --role oisaac ``` I am using v2.1.1-incubating and it works like a champ. I hope this helps! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating
rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating URL: https://github.com/apache/pulsar/issues/2795#issuecomment-432384264 I though this was a bug but it turns out that it was an issue with how to enable authorization. I can't remember exactly where but the documentation was a bit confusing in which properties to enable for authentication vs. authorization. You will get this error if the following property is not set in your broker.conf files ```sh # Configuration to enable Authorization authorizationEnabled=true ``` Once you bounce the broker with this update, the broker log file should contain a line similar to: ```sh [main] INFO org.apache.pulsar.broker.authorization.AuthorizationService - org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider has been loaded. ``` I am using v2.1.1-incubating and it works like a champ. I hope this helps! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2852: Add connection timeout client configuration option
ivankelly commented on issue #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433399375 @cckellogg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #2852: Add connection timeout client configuration option
ivankelly opened a new pull request #2852: Add connection timeout client configuration option URL: https://github.com/apache/pulsar/pull/2852 Allows the client to specify how long to wait for brokers to respond. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2826: Secrets Interface
srkukarni commented on issue #2826: Secrets Interface URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433365590 @merlimat @cckellogg I've made the changes as requested. Please take a look again This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2851: Use the right field to get the filename
srkukarni closed pull request #2851: Use the right field to get the filename URL: https://github.com/apache/pulsar/pull/2851 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 087238c6af..89e3b48620 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -243,8 +243,8 @@ public Response registerFunction(final String tenant, final String namespace, fi Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace()); } else { packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, -fileDetail.getName())); - packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName()); +fileDetail.getFileName())); + packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace()); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Use the right field to get the filename (#2851)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 138c863 Use the right field to get the filename (#2851) 138c863 is described below commit 138c863af0a43bd3423a7f3fb3846d2cb8dfe472 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 02:57:03 2018 -0700 Use the right field to get the filename (#2851) --- .../org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 087238c..89e3b48 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -243,8 +243,8 @@ public class FunctionsImpl { Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace()); } else { packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, -fileDetail.getName())); - packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName()); +fileDetail.getFileName())); + packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace()); }
[pulsar] branch master updated: Make IdentitySerde handle bytes as well (#2848)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b867907 Make IdentitySerde handle bytes as well (#2848) b867907 is described below commit b8679078dc6632795c4d81cff9caedd316444f17 Author: Sanjeev Kulkarni AuthorDate: Fri Oct 26 02:56:45 2018 -0700 Make IdentitySerde handle bytes as well (#2848) * Make IdentitySerde handle bytes as well * Added more information about type of object passed --- pulsar-client-cpp/python/pulsar/functions/serde.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py b/pulsar-client-cpp/python/pulsar/functions/serde.py index ee6366c..968c1c9 100644 --- a/pulsar-client-cpp/python/pulsar/functions/serde.py +++ b/pulsar-client-cpp/python/pulsar/functions/serde.py @@ -75,7 +75,9 @@ class IdentitySerDe(SerDe): def serialize(self, input): if type(input) in self._types: return str(input).encode('utf-8') -raise TypeError +if type(input) == bytes: + return input +raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) def deserialize(self, input_bytes): for typ in self._types: @@ -83,4 +85,4 @@ class IdentitySerDe(SerDe): return typ(input_bytes.decode('utf-8')) except: pass -raise TypeError +return input_bytes
[GitHub] srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well
srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well URL: https://github.com/apache/pulsar/pull/2848 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py b/pulsar-client-cpp/python/pulsar/functions/serde.py index ee6366cf63..968c1c9bc3 100644 --- a/pulsar-client-cpp/python/pulsar/functions/serde.py +++ b/pulsar-client-cpp/python/pulsar/functions/serde.py @@ -75,7 +75,9 @@ def __init__(self): def serialize(self, input): if type(input) in self._types: return str(input).encode('utf-8') -raise TypeError +if type(input) == bytes: + return input +raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) def deserialize(self, input_bytes): for typ in self._types: @@ -83,4 +85,4 @@ def deserialize(self, input_bytes): return typ(input_bytes.decode('utf-8')) except: pass -raise TypeError +return input_bytes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829#discussion_r228456096 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java ## @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return messageId; } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +checkNotNull(data); +ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length)); +PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder(); + +PulsarApi.MessageIdData idData; +try { +idData = builder.mergeFrom(inputStream, null).build(); +} catch (UninitializedMessageException e) { +throw new IOException(e); +} + +MessageId messageId; +if (idData.hasBatchIndex()) { +messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), +idData.getBatchIndex()); +} else { +messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); +} +if (idData.getPartition() > -1 && topicName != null) { Review comment: Oh, we need to pass it to the constructor... sounds odd though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829#discussion_r228455442 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java ## @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return messageId; } +public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { +checkNotNull(data); +ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length)); +PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder(); + +PulsarApi.MessageIdData idData; +try { +idData = builder.mergeFrom(inputStream, null).build(); +} catch (UninitializedMessageException e) { +throw new IOException(e); +} + +MessageId messageId; +if (idData.hasBatchIndex()) { +messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), +idData.getBatchIndex()); +} else { +messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); +} +if (idData.getPartition() > -1 && topicName != null) { Review comment: It seems like we can merge these two if statements. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics
maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829#discussion_r228454245 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -31,7 +31,7 @@ private final String topicName; private final MessageId messageId; -TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { +public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { Review comment: Is this public still needed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on issue #2814: Fix memory issue in cpp ZTSClient
maskit commented on issue #2814: Fix memory issue in cpp ZTSClient URL: https://github.com/apache/pulsar/pull/2814#issuecomment-47349 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2851: Use the right field to get the filename
srkukarni commented on issue #2851: Use the right field to get the filename URL: https://github.com/apache/pulsar/pull/2851#issuecomment-41679 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well
srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well URL: https://github.com/apache/pulsar/pull/2848#issuecomment-41483 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services