[GitHub] skyrocknroll opened a new pull request #2859: Fixed Typo

2018-10-26 Thread GitBox
skyrocknroll opened a new pull request #2859: Fixed Typo
URL: https://github.com/apache/pulsar/pull/2859
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] benjaminhuo commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?

2018-10-26 Thread GitBox
benjaminhuo commented on issue #2858: What official pulsar helm chart request 
15GB memory for bookeeper,zookeeper and broker?
URL: https://github.com/apache/pulsar/issues/2858#issuecomment-433593105
 
 
   Thanks for the explanation and suggestion.
   I'd like to contribute.
   
   Regards,
   Ben


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cckellogg commented on a change in pull request #2855: Secretprovider interfaces and some default implementations

2018-10-26 Thread GitBox
cckellogg commented on a change in pull request #2855: Secretprovider 
interfaces and some default implementations
URL: https://github.com/apache/pulsar/pull/2855#discussion_r228702112
 
 

 ##
 File path: 
pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.secretsproviderconfigurator;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1EnvVar;
+import io.kubernetes.client.models.V1EnvVarSource;
+import io.kubernetes.client.models.V1SecretKeySelector;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.proto.Function;
+import 
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * This file defines the SecretsProviderConfigurator that will be used by 
default for running in Kubernetes.
+ * As such this implementation is strictly when workers are configured to use 
kubernetes runtime.
+ * We use kubernetes in built secrets and bind them as environment variables 
within the function container
+ * to ensure that the secrets are availble to the function at runtime. Then we 
plug in the
+ * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read 
these environment variables
+ */
+public class KubernetesSecretsProviderConfigurator implements 
SecretsProviderConfigurator {
+@Override
+public String getSecretsProviderClassName(Function.FunctionDetails 
functionDetails) {
+switch (functionDetails.getRuntime()) {
+case JAVA:
+return EnvironmentBasedSecretsProvider.class.getName();
+case PYTHON:
+return "secretsprovider.EnvironmentBasedSecretsProvider";
+default:
+throw new RuntimeException("Unknown function runtime " + 
functionDetails.getRuntime());
+}
+}
+
+@Override
+public Map 
getSecretsProviderConfig(Function.FunctionDetails functionDetails) {
+return null;
+}
+
+@Override
+public void configureKubernetesRuntimeSecretsProvider(V1Container 
container, Function.FunctionDetails functionDetails) {
+if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+Type type = new TypeToken>() {
+}.getType();
+Map secretsMap = new 
Gson().fromJson(functionDetails.getSecretsMap(), type);
+for (Map.Entry entry : secretsMap.entrySet()) {
+final V1EnvVar secretEnv = new V1EnvVar();
+Map kv = (Map) 
entry.getValue();
+secretEnv.name(entry.getKey())
+.valueFrom(new V1EnvVarSource()
+.secretKeyRef(new V1SecretKeySelector()
+
.name(kv.entrySet().iterator().next().getKey())
 
 Review comment:
   should this look for specific keys in the Map? There should be some error 
checking? What if the first key is unexpected or unknown value? The provider 
should provide some validation of the secret object and not process it if it's 
not configured correctly. Can there be some unit tests with this too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar adaptor for Apache Kafka

2018-10-26 Thread GitBox
skyrocknroll commented on issue #2792: support auto.offset.reset on the Pulsar 
adaptor for Apache Kafka
URL: https://github.com/apache/pulsar/issues/2792#issuecomment-433592430
 
 
   @ivankelly https://kafka.apache.org/documentation.html#newconsumerconfigs .  
What to do when there is no initial offset in Kafka or if the current offset 
does not exist any more on the server (e.g. because that data has been deleted):
   *earliest*: automatically reset the offset to the earliest offset
   *latest*: automatically reset the offset to the latest offset
   none: throw exception to the consumer if no previous offset is found for the 
consumer's group
   anything else: throw exception to the consumer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?

2018-10-26 Thread GitBox
sijie commented on issue #2858: What official pulsar helm chart request 15GB 
memory for bookeeper,zookeeper and broker?
URL: https://github.com/apache/pulsar/issues/2858#issuecomment-433592171
 
 
   @benjaminhuo 
   
   > Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and 
broker
   
   the helm chart was contributed from a production deployment in google cloud. 
so those values are set to pretty high. I think we can improve it by 
introducing 3 different groups of settings, e.g. small, medium and large. so 
people can choose the proper settings to deploy to their environment. I am 
wondering if you are interested in contributing a change to this helm chart :)
   
   > The manifest(bookie.yaml) deploy bookeeper as daemonset, why not 
statefulset with persistent volume claim?
   
   I think helm chart deploys bookkeeper as statefulset. 
https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/bookkeeper-statefulset.yaml
   
   You mean the kubernetes manifest here: 
https://github.com/apache/pulsar/blob/master/deployment/kubernetes/generic/bookie.yaml#L33
   
   The reason why it is using daemonset for generic deployment is following: as 
a generic deployment, people might not enable any persistent volumes enabled in 
their k8s environment. so for they to tryout pulsar quickly, daemonset is an 
easy solution to start.
   
   In cloud environment, for example, aws and google cloud, we are recommending 
people deploying using statefulset.
   
   
https://github.com/apache/pulsar/blob/master/deployment/kubernetes/aws/bookkeeper.yaml#L67
   
   but since k8s is now supporting local volume claim, I think we can change 
generic deployment to use statefulset with local volume claim. wondering if you 
are interested in contributing to this as well?
   
   Hope this answer your question?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2814: Fix memory issue in cpp ZTSClient

2018-10-26 Thread GitBox
sijie closed pull request #2814: Fix memory issue in cpp ZTSClient
URL: https://github.com/apache/pulsar/pull/2814
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc 
b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 3097c30c65..94671241e2 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char 
*input, int length) {
 }
 
 char *ZTSClient::base64Decode(const char *input) {
-BIO *bio, *b64;
+if (input == NULL) {
+return NULL;
+}
+
 size_t length = strlen(input);
+if (length == 0) {
+return NULL;
+}
+
+BIO *bio, *b64;
 char *result = (char *)malloc(length);
 
 bio = BIO_new_mem_buf((void *)input, -1);
@@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) {
 bio = BIO_push(b64, bio);
 
 BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL);
-BIO_read(bio, result, length);
+int decodeStrLen = BIO_read(bio, result, length);
 BIO_free_all(bio);
+if (decodeStrLen > 0) {
+result[decodeStrLen] = '\0';
+return result;
+}
+free(result);
 
-return result;
+return NULL;
 }
 
 const std::string ZTSClient::getPrincipalToken() const {
 // construct unsigned principal token
 std::string unsignedTokenString = "v=S1";
-char host[BUFSIZ];
+char host[BUFSIZ] = {};
 long long t = (long long)time(NULL);
 
 gethostname(host, sizeof(host));
@@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const {
 
 // signing
 const char *unsignedToken = unsignedTokenString.c_str();
-unsigned char signature[BUFSIZ];
-unsigned char hash[SHA256_DIGEST_LENGTH];
+unsigned char signature[BUFSIZ] = {};
+unsigned char hash[SHA256_DIGEST_LENGTH] = {};
 unsigned int siglen;
 FILE *fp;
 RSA *privateKey;
@@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const {
 }
 char *decodeStr = base64Decode(privateKeyUri_.data.c_str());
 
+if (decodeStr == NULL) {
+LOG_ERROR("Failed to decode privateKey");
+return "";
+}
+
 BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1);
 BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL);
 if (bio == NULL) {
 LOG_ERROR("Failed to create key BIO");
+free(decodeStr);
 return "";
 }
 privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL);
 BIO_free(bio);
+free(decodeStr);
 if (privateKey == NULL) {
 LOG_ERROR("Failed to load privateKey");
 return "";
@@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const {
 std::string principalToken = unsignedTokenString + ";s=" + 
ybase64Encode(signature, siglen);
 LOG_DEBUG("Created signed principal token: " << principalToken);
 
+RSA_free(privateKey);
+
 return principalToken;
 }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: Fix memory issue in cpp ZTSClient (#2814)

2018-10-26 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new c518247  Fix memory issue in cpp ZTSClient (#2814)
c518247 is described below

commit c518247d873bfe07364359e5f63f5fd7f689c0e9
Author: hrsakai 
AuthorDate: Sat Oct 27 14:09:27 2018 +0900

Fix memory issue in cpp ZTSClient (#2814)

### Modifications

* Use `calloc` instead of `malloc` in order to add Termination 
character(`\0`).
* Free memory allocated with `calloc` and `PEM_read_bio_RSAPrivateKey`.
---
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 34 +-
 1 file changed, 28 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc 
b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 3097c30..9467124 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -141,8 +141,16 @@ std::string ZTSClient::ybase64Encode(const unsigned char 
*input, int length) {
 }
 
 char *ZTSClient::base64Decode(const char *input) {
-BIO *bio, *b64;
+if (input == NULL) {
+return NULL;
+}
+
 size_t length = strlen(input);
+if (length == 0) {
+return NULL;
+}
+
+BIO *bio, *b64;
 char *result = (char *)malloc(length);
 
 bio = BIO_new_mem_buf((void *)input, -1);
@@ -150,16 +158,21 @@ char *ZTSClient::base64Decode(const char *input) {
 bio = BIO_push(b64, bio);
 
 BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL);
-BIO_read(bio, result, length);
+int decodeStrLen = BIO_read(bio, result, length);
 BIO_free_all(bio);
+if (decodeStrLen > 0) {
+result[decodeStrLen] = '\0';
+return result;
+}
+free(result);
 
-return result;
+return NULL;
 }
 
 const std::string ZTSClient::getPrincipalToken() const {
 // construct unsigned principal token
 std::string unsignedTokenString = "v=S1";
-char host[BUFSIZ];
+char host[BUFSIZ] = {};
 long long t = (long long)time(NULL);
 
 gethostname(host, sizeof(host));
@@ -176,8 +189,8 @@ const std::string ZTSClient::getPrincipalToken() const {
 
 // signing
 const char *unsignedToken = unsignedTokenString.c_str();
-unsigned char signature[BUFSIZ];
-unsigned char hash[SHA256_DIGEST_LENGTH];
+unsigned char signature[BUFSIZ] = {};
+unsigned char hash[SHA256_DIGEST_LENGTH] = {};
 unsigned int siglen;
 FILE *fp;
 RSA *privateKey;
@@ -189,14 +202,21 @@ const std::string ZTSClient::getPrincipalToken() const {
 }
 char *decodeStr = base64Decode(privateKeyUri_.data.c_str());
 
+if (decodeStr == NULL) {
+LOG_ERROR("Failed to decode privateKey");
+return "";
+}
+
 BIO *bio = BIO_new_mem_buf((void *)decodeStr, -1);
 BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL);
 if (bio == NULL) {
 LOG_ERROR("Failed to create key BIO");
+free(decodeStr);
 return "";
 }
 privateKey = PEM_read_bio_RSAPrivateKey(bio, NULL, NULL, NULL);
 BIO_free(bio);
+free(decodeStr);
 if (privateKey == NULL) {
 LOG_ERROR("Failed to load privateKey");
 return "";
@@ -225,6 +245,8 @@ const std::string ZTSClient::getPrincipalToken() const {
 std::string principalToken = unsignedTokenString + ";s=" + 
ybase64Encode(signature, siglen);
 LOG_DEBUG("Created signed principal token: " << principalToken);
 
+RSA_free(privateKey);
+
 return principalToken;
 }
 



[pulsar] branch master updated: Added ability for the kubernetes to poll a configmap to look out for changes (#2856)

2018-10-26 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 2d208c5  Added ability for the kubernetes to poll a configmap to look 
out for changes (#2856)
2d208c5 is described below

commit 2d208c565e3bd25a1cc12d904f72b91f96ec6cab
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 22:07:54 2018 -0700

Added ability for the kubernetes to poll a configmap to look out for 
changes (#2856)

### Motivation
While running functions worker in kubernetes, it might be hard to update 
any kind of config of the worker without killing it. This pr allows certain 
properties of the functions to be stored in a configmap. The KubernetesRuntime 
Factory will keep an eye for any changes and apply the changes on the fly 
dramatically simplifying any config updates
---
 .../runtime/KubernetesRuntimeFactory.java  | 126 +++--
 .../functions/runtime/KubernetesRuntimeTest.java   |   4 +-
 .../functions/worker/FunctionRuntimeManager.java   |   4 +-
 .../pulsar/functions/worker/WorkerConfig.java  |   5 +
 4 files changed, 101 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 1a180ae..c6d5d02 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -24,13 +24,20 @@ import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 
+import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -40,24 +47,33 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 @Slf4j
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
-private final String k8Uri;
-private final String jobNamespace;
-private final String pulsarDockerImageName;
-private final String pulsarRootDir;
+@Getter
+@Setter
+@NoArgsConstructor
+class KubernetesInfo {
+private String k8Uri;
+private String jobNamespace;
+private String pulsarDockerImageName;
+private String pulsarRootDir;
+private String pulsarAdminUrl;
+private String pulsarServiceUrl;
+private String pythonDependencyRepository;
+private String pythonExtraDependencyRepository;
+private String changeConfigMap;
+private String changeConfigMapNamespace;
+}
+private final KubernetesInfo kubernetesInfo;
 private final Boolean submittingInsidePod;
 private final Boolean installUserCodeDependencies;
-private final String pythonDependencyRepository;
-private final String pythonExtraDependencyRepository;
 private final Map customLabels;
-private final String pulsarAdminUri;
-private final String pulsarServiceUri;
+private final Integer expectedMetricsCollectionInterval;
 private final String stateStorageServiceUri;
 private final AuthenticationConfig authConfig;
 private final String javaInstanceJarFile;
 private final String pythonInstanceFile;
 private final String prometheusMetricsServerJarFile;
 private final String logDirectory = "logs/functions";
-private final Integer expectedMetricsInterval;
+private Timer changeConfigMapTimer;
 private AppsV1Api appsClient;
 private CoreV1Api coreClient;
 
@@ -75,36 +91,41 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
 String pulsarAdminUri,
 String stateStorageServiceUri,
 AuthenticationConfig authConfig,
-Integer expectedMetricsInterval) {
-this.k8Uri = k8Uri;
+Integer expectedMetricsCollectionInterval,
+String changeConfigMap,
+String changeConfigMapNamespace) {
+this.kubernetesInfo = new KubernetesInfo();
+this.kubernetesInfo.setK8Uri(k8Uri);
 if (

[GitHub] sijie closed pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes

2018-10-26 Thread GitBox
sijie closed pull request #2856: Added ability for the kubernetes to poll a 
configmap to look out for changes
URL: https://github.com/apache/pulsar/pull/2856
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 1a180aee0c..c6d5d02645 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -24,13 +24,20 @@
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 
+import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -40,24 +47,33 @@
 @Slf4j
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
-private final String k8Uri;
-private final String jobNamespace;
-private final String pulsarDockerImageName;
-private final String pulsarRootDir;
+@Getter
+@Setter
+@NoArgsConstructor
+class KubernetesInfo {
+private String k8Uri;
+private String jobNamespace;
+private String pulsarDockerImageName;
+private String pulsarRootDir;
+private String pulsarAdminUrl;
+private String pulsarServiceUrl;
+private String pythonDependencyRepository;
+private String pythonExtraDependencyRepository;
+private String changeConfigMap;
+private String changeConfigMapNamespace;
+}
+private final KubernetesInfo kubernetesInfo;
 private final Boolean submittingInsidePod;
 private final Boolean installUserCodeDependencies;
-private final String pythonDependencyRepository;
-private final String pythonExtraDependencyRepository;
 private final Map customLabels;
-private final String pulsarAdminUri;
-private final String pulsarServiceUri;
+private final Integer expectedMetricsCollectionInterval;
 private final String stateStorageServiceUri;
 private final AuthenticationConfig authConfig;
 private final String javaInstanceJarFile;
 private final String pythonInstanceFile;
 private final String prometheusMetricsServerJarFile;
 private final String logDirectory = "logs/functions";
-private final Integer expectedMetricsInterval;
+private Timer changeConfigMapTimer;
 private AppsV1Api appsClient;
 private CoreV1Api coreClient;
 
@@ -75,36 +91,41 @@ public KubernetesRuntimeFactory(String k8Uri,
 String pulsarAdminUri,
 String stateStorageServiceUri,
 AuthenticationConfig authConfig,
-Integer expectedMetricsInterval) {
-this.k8Uri = k8Uri;
+Integer expectedMetricsCollectionInterval,
+String changeConfigMap,
+String changeConfigMapNamespace) {
+this.kubernetesInfo = new KubernetesInfo();
+this.kubernetesInfo.setK8Uri(k8Uri);
 if (!isEmpty(jobNamespace)) {
-this.jobNamespace = jobNamespace;
+this.kubernetesInfo.setJobNamespace(jobNamespace);
 } else {
-this.jobNamespace = "default";
+this.kubernetesInfo.setJobNamespace("default");
 }
 if (!isEmpty(pulsarDockerImageName)) {
-this.pulsarDockerImageName = pulsarDockerImageName;
+
this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
 } else {
-this.pulsarDockerImageName = "apachepulsar/pulsar";
+
this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
 }
 if (!isEmpty(pulsarRootDir)) {
-this.pulsarRootDir = pulsarRootDir;
+this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
 } else {
-this.pulsarRootDir = "/pulsar";
+this.kubernetesInfo.setPulsarRootDir("/pulsar");
 }
+
this.kuberne

[pulsar] branch master updated: Reduce memory reserved for prometheus (#2857)

2018-10-26 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new eb747c4  Reduce memory reserved for prometheus (#2857)
eb747c4 is described below

commit eb747c48e44ae7861771565c8e46f8ed6c8d6e48
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 22:06:42 2018 -0700

Reduce memory reserved for prometheus (#2857)
---
 .../java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 80eb840..7f3f72d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -66,7 +66,7 @@ class KubernetesRuntime implements Runtime {
 private static final Integer GRPC_PORT = 9093;
 private static final Integer PROMETHEUS_PORT = 9094;
 private static final Double prometheusMetricsServerCpu = 0.1;
-private static final Long prometheusMetricsServerRam = 25000l;
+private static final Long prometheusMetricsServerRam = 12500l;
 public static final Pattern VALID_POD_NAME_REGEX =
 
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
 Pattern.CASE_INSENSITIVE);



[GitHub] sijie closed pull request #2857: Reduce memory reserved for prometheus

2018-10-26 Thread GitBox
sijie closed pull request #2857: Reduce memory reserved for prometheus
URL: https://github.com/apache/pulsar/pull/2857
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 80eb8400af..7f3f72d44b 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -66,7 +66,7 @@
 private static final Integer GRPC_PORT = 9093;
 private static final Integer PROMETHEUS_PORT = 9094;
 private static final Double prometheusMetricsServerCpu = 0.1;
-private static final Long prometheusMetricsServerRam = 25000l;
+private static final Long prometheusMetricsServerRam = 12500l;
 public static final Pattern VALID_POD_NAME_REGEX =
 
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
 Pattern.CASE_INSENSITIVE);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] benjaminhuo opened a new issue #2858: What official pulsar helm chart request 15GB memory for bookeeper, zookeeper and broker?

2018-10-26 Thread GitBox
benjaminhuo opened a new issue #2858: What official pulsar helm chart request 
15GB memory for bookeeper,zookeeper and broker?
URL: https://github.com/apache/pulsar/issues/2858
 
 
    Expected behavior
   The memory should be set to a reasonable size
   
    Actual behavior
   
   Tell us what happens instead
   Official pulsar helm chart request 15GB memory for bookeeper,zookeeper and 
broker
   
   
![snip20181027_103](https://user-images.githubusercontent.com/18525465/47598862-1cd85080-d9d5-11e8-8201-451d6c377486.png)
   
   
![snip20181027_104](https://user-images.githubusercontent.com/18525465/47598863-1cd85080-d9d5-11e8-8728-46d4cb3b6d42.png)
   
   
![snip20181027_107](https://user-images.githubusercontent.com/18525465/47598864-1cd85080-d9d5-11e8-89c3-8d8b43343c2b.png)
   
   
![snip20181027_108](https://user-images.githubusercontent.com/18525465/47598865-1d70e700-d9d5-11e8-8c26-0f718f609173.png)
   
   
    Steps to reproduce
   
    System configuration
   **Pulsar version**: x.y
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus

2018-10-26 Thread GitBox
srkukarni opened a new pull request #2857: Reduce memory reserved for prometheus
URL: https://github.com/apache/pulsar/pull/2857
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2856: Added ability for the kubernetes to poll a configmap to look out for changes

2018-10-26 Thread GitBox
srkukarni opened a new pull request #2856: Added ability for the kubernetes to 
poll a configmap to look out for changes
URL: https://github.com/apache/pulsar/pull/2856
 
 
   
   ### Motivation
   While running functions worker in kubernetes, it might be hard to update any 
kind of config of the worker without killing it. This pr allows certain 
properties of the functions to be stored in a configmap. The KubernetesRuntime 
Factory will keep an eye for any changes and apply the changes on the fly 
dramatically simplifying any config updates
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2855: Secretprovider interfaces and some default implementations

2018-10-26 Thread GitBox
srkukarni opened a new pull request #2855: Secretprovider interfaces and some 
default implementations
URL: https://github.com/apache/pulsar/pull/2855
 
 
   ### Motivation
   
   This pr defines the SecretsProviderConfigurator that configures the function 
instances/containers and plugs in the right SecretsProvider for the instances 
to use.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2855: Secretprovider interfaces and some default implementations

2018-10-26 Thread GitBox
srkukarni commented on issue #2855: Secretprovider interfaces and some default 
implementations
URL: https://github.com/apache/pulsar/pull/2855#issuecomment-433562824
 
 
   @cckellogg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: Secrets Frontend (#2853)

2018-10-26 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 82b267b  Secrets Frontend (#2853)
82b267b is described below

commit 82b267ba9d59fa677d5b4fdcc48e38de6c59dee9
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 14:49:44 2018 -0700

Secrets Frontend (#2853)

* Secrets Frontend

* Do not expose secrets in cli
---
 .../pulsar/common/functions/FunctionConfig.java|  5 ++
 .../org/apache/pulsar/common/io/SinkConfig.java|  5 ++
 .../org/apache/pulsar/common/io/SourceConfig.java  |  5 ++
 .../instance/src/main/python/Function_pb2.py   | 76 --
 .../proto/src/main/proto/Function.proto|  1 +
 .../functions/utils/FunctionConfigUtils.java   | 10 +++
 .../pulsar/functions/utils/SinkConfigUtils.java|  8 +++
 .../pulsar/functions/utils/SourceConfigUtils.java  |  9 +++
 8 files changed, 85 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index ea5866a..2163df1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -79,6 +79,11 @@ public class FunctionConfig {
 private ProcessingGuarantees processingGuarantees;
 private boolean retainOrdering;
 private Map userConfig;
+// This is a map of secretName(aka how the secret is going to be
+// accessed in the function via context) to an object that
+// encapsulates how the secret is fetched by the underlying
+// secrets provider
+private Map secrets;
 private Runtime runtime;
 private boolean autoAck;
 private int maxMessageRetries = -1;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index c4bf1ca..355c696 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -55,6 +55,11 @@ public class SinkConfig {
 private Map inputSpecs = new TreeMap<>();
 
 private Map configs;
+// This is a map of secretName(aka how the secret is going to be
+// accessed in the function via context) to an object that
+// encapsulates how the secret is fetched by the underlying
+// secrets provider
+private Map secrets;
 private int parallelism = 1;
 private FunctionConfig.ProcessingGuarantees processingGuarantees;
 private boolean retainOrdering;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 57c0f79..9dbe97c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -46,6 +46,11 @@ public class SourceConfig {
 private String schemaType;
 
 private Map configs;
+// This is a map of secretName(aka how the secret is going to be
+// accessed in the function via context) to an object that
+// encapsulates how the secret is fetched by the underlying
+// secrets provider
+private Map secrets;
 private int parallelism = 1;
 private FunctionConfig.ProcessingGuarantees processingGuarantees;
 private Resources resources;
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index b09e105..d6cc8af 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -18,6 +18,7 @@
 #
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
 import sys
@@ -39,7 +40,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xd4\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 

[GitHub] srkukarni commented on issue #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni commented on issue #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433541994
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2852: Add connection timeout client configuration option

2018-10-26 Thread GitBox
merlimat commented on issue #2852: Add connection timeout client configuration 
option
URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433535510
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client

2018-10-26 Thread GitBox
merlimat opened a new pull request #2854: Fixed ZLib decompression in C++ client
URL: https://github.com/apache/pulsar/pull/2854
 
 
   ### Motivation
   
   The ZLib method `uncompress()` is not able to process data that was 
compressed from Pulsar Java producer with ZLib. 
   
   This is due to the the initialization settings that are used inside the 
`uncompress()` functions and that are different from how ZLib is used within 
the JDK. 
   
   This causes C++ (and derivatives) consumers to drop messages that were 
compressed by Java producer. A Java consumer is able to correctly decompress 
both.
   
   ### Modifications
   
* Fixed decompression in C++ to be able to inflate both zstream terminations
* Added option in C++ perf producer to compress messages
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni commented on issue #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433526803
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni commented on issue #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515376
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni commented on issue #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433515089
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics

2018-10-26 Thread GitBox
merlimat closed pull request #2829: Fix Websocket Consume Messages in 
Partitioned Topics
URL: https://github.com/apache/pulsar/pull/2829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index e084b04622..7f1b5aa45c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -358,7 +358,7 @@ public void testProxyStats() throws Exception {
 + "/my-sub?subscriptionType=Failover";
 final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic + "/";
 final String readerUri = "ws://localhost:" + port + 
"/ws/v2/reader/persistent/" + topic;
-System.out.println(consumerUri+", "+producerUri);
+System.out.println(consumerUri + ", " + producerUri);
 URI consumeUri = URI.create(consumerUri);
 URI produceUri = URI.create(producerUri);
 URI readUri = URI.create(readerUri);
@@ -424,6 +424,47 @@ public void testProxyStats() throws Exception {
 }
 }
 
+@Test(timeOut = 1)
+public void consumeMessagesInPartitionedTopicTest() throws Exception {
+final String namespace = "my-property/my-ns";
+final String topic = namespace + "/" + "my-topic7";
+admin.topics().createPartitionedTopic("persistent://" + topic, 3);
+
+final String subscription = "my-sub";
+final String consumerUri = "ws://localhost:" + port + 
"/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic;
+
+URI consumeUri = URI.create(consumerUri);
+URI produceUri = URI.create(producerUri);
+
+WebSocketClient consumeClient = new WebSocketClient();
+WebSocketClient produceClient = new WebSocketClient();
+
+SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+try {
+produceClient.start();
+ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+Future producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+producerFuture.get();
+produceSocket.sendMessage(100);
+} finally {
+stopWebSocketClient(produceClient);
+}
+
+Thread.sleep(500);
+
+try {
+consumeClient.start();
+ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+Future consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+consumerFuture.get();
+} finally {
+stopWebSocketClient(consumeClient);
+}
+}
+
 private void verifyTopicStat(Client client, String baseUrl, String topic) {
 String statUrl = baseUrl + topic + "/stats";
 WebTarget webTarget = client.target(statUrl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index 1bb3e086ed..4d38aa949d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -22,6 +22,7 @@
 
 import java.io.Serializable;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 
 /**
  * Opaque unique identifier of a single message
@@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws 
IOException {
 return MessageIdImpl.fromByteArray(data);
 }
 
+public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
+}
+
 public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);
 
 public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, 
Long.MAX_VALUE, -1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 3686b124a8..5a53436e7b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.client.api.MessageId;
 imp

[GitHub] merlimat commented on issue #1070: Provide a nodejs client

2018-10-26 Thread GitBox
merlimat commented on issue #1070: Provide a nodejs client 
URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433491319
 
 
   @mujimu There are indeed plans to start soon working on this. Thanks for the 
link. If you have JS experience it would be also good to have feedback on API 
ergonomics once we start working on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: Fix Websocket Consume Messages in Partitioned Topics (#2829)

2018-10-26 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d79499d  Fix Websocket Consume Messages in Partitioned Topics (#2829)
d79499d is described below

commit d79499dbe24fbd8b065489875ba86a47761ec33f
Author: Yuto Furuta 
AuthorDate: Sat Oct 27 02:53:09 2018 +0900

Fix Websocket Consume Messages in Partitioned Topics (#2829)

* fix consume messages in partitioned topics on websocket

* add consumeMessagesInPartitionedTopicTest

* add fromByteArrayWithTopic

* remove public
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 43 +-
 .../org/apache/pulsar/client/api/MessageId.java|  5 +++
 .../apache/pulsar/client/impl/MessageIdImpl.java   | 31 
 .../apache/pulsar/websocket/ConsumerHandler.java   |  3 +-
 4 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index e084b04..7f1b5aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
 + "/my-sub?subscriptionType=Failover";
 final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic + "/";
 final String readerUri = "ws://localhost:" + port + 
"/ws/v2/reader/persistent/" + topic;
-System.out.println(consumerUri+", "+producerUri);
+System.out.println(consumerUri + ", " + producerUri);
 URI consumeUri = URI.create(consumerUri);
 URI produceUri = URI.create(producerUri);
 URI readUri = URI.create(readerUri);
@@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
 }
 }
 
+@Test(timeOut = 1)
+public void consumeMessagesInPartitionedTopicTest() throws Exception {
+final String namespace = "my-property/my-ns";
+final String topic = namespace + "/" + "my-topic7";
+admin.topics().createPartitionedTopic("persistent://" + topic, 3);
+
+final String subscription = "my-sub";
+final String consumerUri = "ws://localhost:" + port + 
"/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic;
+
+URI consumeUri = URI.create(consumerUri);
+URI produceUri = URI.create(producerUri);
+
+WebSocketClient consumeClient = new WebSocketClient();
+WebSocketClient produceClient = new WebSocketClient();
+
+SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+try {
+produceClient.start();
+ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+Future producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+producerFuture.get();
+produceSocket.sendMessage(100);
+} finally {
+stopWebSocketClient(produceClient);
+}
+
+Thread.sleep(500);
+
+try {
+consumeClient.start();
+ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+Future consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+consumerFuture.get();
+} finally {
+stopWebSocketClient(consumeClient);
+}
+}
+
 private void verifyTopicStat(Client client, String baseUrl, String topic) {
 String statUrl = baseUrl + topic + "/stats";
 WebTarget webTarget = client.target(statUrl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index 1bb3e08..4d38aa9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import java.io.Serializable;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 
 /**
  * Opaque unique identifier of a single message
@@ -49,6 +50,10 @@ public interface MessageId extends Comparable, 
Serializable {
 return MessageIdImpl.fromByteArray(data);
 }
 
+public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+return MessageIdImpl.

[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface

2018-10-26 Thread GitBox
jerrypeng commented on a change in pull request #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#discussion_r228607992
 
 

 ##
 File path: 
pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.secretsproviderconfigurator;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1EnvVar;
+import io.kubernetes.client.models.V1EnvVarSource;
+import io.kubernetes.client.models.V1SecretKeySelector;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import 
org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * Context provides contextual information to the executing function.
+ * Features like which message id we are handling, whats the topic name of the
+ * message, what are our operating constraints, etc can be accessed by the
+ * executing function
+ */
+public class KubernetesSecretsProviderConfigurator implements 
SecretsProviderConfigurator {
+@Override
+public void init(Map config) {
+
+}
+
+@Override
+public String getSecretsProviderClassName(Function.FunctionDetails 
functionDetails) {
+switch (functionDetails.getRuntime()) {
+case JAVA:
+return EnvironmentBasedSecretsProvider.class.getName();
+case PYTHON:
+return "secretsprovider.EnvironmentBasedSecretsProvider";
+default:
+throw new RuntimeException("Unknown function runtime " + 
functionDetails.getRuntime());
+}
+}
+
+@Override
+public Map 
getSecretsProviderConfig(Function.FunctionDetails functionDetails) {
+return null;
+}
+
+@Override
+public void configureKubernetesRuntimeSecretsProvider(V1Container 
container, Function.FunctionDetails functionDetails) {
+if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+Type type = new TypeToken>() {
+}.getType();
+Map secretsMap = new 
Gson().fromJson(functionDetails.getSecretsMap(), type);
 
 Review comment:
   Perhaps its worthwhile to explicitly define the structure of the "Object" 
for kubernetes so it is clear what configs should be there. You can just 
probably declare a class that represents the Object


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #2826: Secrets Interface

2018-10-26 Thread GitBox
jerrypeng commented on a change in pull request #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#discussion_r228606081
 
 

 ##
 File path: 
pulsar-functions/secretsprovider/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
 ##
 @@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.secretsprovider;
+
+import java.util.Map;
+
+/**
+ * Context provides contextual information to the executing function.
 
 Review comment:
   I don't think this is the right comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: adding Python function instance unit test (#2850)

2018-10-26 Thread jerrypeng
This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a1cf22  adding Python function instance unit test (#2850)
1a1cf22 is described below

commit 1a1cf226a718fbaece393e8e1ed9584b59a8752b
Author: Boyang Jerry Peng 
AuthorDate: Fri Oct 26 10:14:01 2018 -0700

adding Python function instance unit test (#2850)

* adding python function instance unittests

* adding license headers
---
 bin/pulsar |  2 +-
 bin/pulsar-admin   |  2 +-
 distribution/server/src/assemble/bin.xml   |  2 +-
 pulsar-client-cpp/run-unit-tests.sh|  4 ++
 pulsar-functions/instance/pom.xml  | 31 
 .../src/scripts/run_python_instance_tests.sh   | 30 +++
 .../src/test/python/test_python_instance.py| 58 ++
 pulsar-functions/runtime/pom.xml   | 28 ---
 8 files changed, 126 insertions(+), 31 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index 9cacfe7..02a81a0 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -94,7 +94,7 @@ fi
 # find the python instance location
 if [ ! -f "${PY_INSTANCE_FILE}" ]; then
 # didn't find a released python instance, then search the built python 
instance
-
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
 if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
 echo "\nCouldn't find pulsar-functions python instance.";
 echo "Make sure you've run 'mvn package'\n";
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index 1a1339d..39d1239 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -130,7 +130,7 @@ fi
 # find the python instance location
 if [ ! -f "${PY_INSTANCE_FILE}" ]; then
 # didn't find a released python instance, then search the built python 
instance
-
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
 if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
 PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
 else
diff --git a/distribution/server/src/assemble/bin.xml 
b/distribution/server/src/assemble/bin.xml
index c9ad893..d17d4ff 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -44,7 +44,7 @@
   ${basedir}/licenses
 
 
-  
${basedir}/../../pulsar-functions/runtime/target/python-instance
+  
${basedir}/../../pulsar-functions/instance/target/python-instance
   instances/python-instance
 
 
diff --git a/pulsar-client-cpp/run-unit-tests.sh 
b/pulsar-client-cpp/run-unit-tests.sh
index ed4b6b4..cffe002 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then
 python pulsar_test.py
 RES=$?
 
+echo " Running Python Function Instance unit tests"
+bash 
/pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
+RES=$?
+
 popd
 popd
 
diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index e206b11..1c8f3ab 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -103,4 +103,35 @@
 
   
 
+  
+
+  
+  
+org.apache.maven.plugins
+maven-antrun-plugin
+
+  
+compile
+
+  run
+
+
+  
+building python instance
+
+
+
+
+
+
+  
+
+  
+
+  
+
+  
+
 
diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh 
b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
new file mode 100644
index 000..9b33c24
--- /dev/null
+++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on

[pulsar] branch master updated: Cleanup Logging for python functions (#2847)

2018-10-26 Thread jerrypeng
This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fa212bc  Cleanup Logging for python functions (#2847)
fa212bc is described below

commit fa212bc8bb0cbd81729b498b6041cde794be9684
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 10:13:39 2018 -0700

Cleanup Logging for python functions (#2847)

* Cleanup Logging for python functions

* More statements to debug

* More debug statements

* More debug
---
 .../instance/src/main/python/python_instance.py|  8 ++---
 .../instance/src/main/python/server.py |  8 ++---
 pulsar-functions/instance/src/main/python/util.py  | 37 ++
 3 files changed, 24 insertions(+), 29 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 03dafae..5f1645d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -187,7 +187,7 @@ class PythonInstance(object):
   else:
 serde_kclass = util.import_class(os.path.dirname(self.user_code), 
serde)
   self.input_serdes[topic] = serde_kclass()
-  Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
+  Log.debug("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
   self.consumers[topic] = self.pulsar_client.subscribe(
 str(topic), subscription_name,
 consumer_type=mode,
@@ -201,7 +201,7 @@ class PythonInstance(object):
   else:
 serde_kclass = util.import_class(os.path.dirname(self.user_code), 
consumer_conf.serdeClassName)
   self.input_serdes[topic] = serde_kclass()
-  Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
+  Log.debug("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
   if consumer_conf.isRegexPattern:
 self.consumers[topic] = self.pulsar_client.subscribe(
   re.compile(str(topic)), subscription_name,
@@ -237,7 +237,7 @@ class PythonInstance(object):
   Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
 
   def actual_execution(self):
-Log.info("Started Thread for executing the function")
+Log.debug("Started Thread for executing the function")
 while True:
   msg = self.queue.get(True)
   if isinstance(msg, InternalQuitMessage):
@@ -321,7 +321,7 @@ class PythonInstance(object):
   def setup_producer(self):
 if self.instance_config.function_details.sink.topic != None and \
 len(self.instance_config.function_details.sink.topic) > 0:
-  Log.info("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
+  Log.debug("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
   self.producer = self.pulsar_client.create_producer(
 str(self.instance_config.function_details.sink.topic),
 block_if_queue_full=True,
diff --git a/pulsar-functions/instance/src/main/python/server.py 
b/pulsar-functions/instance/src/main/python/server.py
index 611a737..58d43d2 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -35,20 +35,20 @@ class 
InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr
 self.pyinstance = pyinstance
 
   def GetFunctionStatus(self, request, context):
-Log.info("Came in GetFunctionStatus")
+Log.debug("Came in GetFunctionStatus")
 return self.pyinstance.get_function_status()
 
   def GetAndResetMetrics(self, request, context):
-Log.info("Came in GetAndResetMetrics")
+Log.debug("Came in GetAndResetMetrics")
 return self.pyinstance.get_and_reset_metrics()
 
   def ResetMetrics(self, request, context):
-Log.info("Came in ResetMetrics")
+Log.debug("Came in ResetMetrics")
 self.pyinstance.reset_metrics()
 return request
 
   def GetMetrics(self, request, context):
-Log.info("Came in GetMetrics")
+Log.debug("Came in GetMetrics")
 return self.pyinstance.get_metrics()
 
   def HealthCheck(self, request, context):
diff --git a/pulsar-functions/instance/src/main/python/util.py 
b/pulsar-functions/instance/src/main/python/util.py
index 4736457..56c1ce1 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions'
 def import_class(from_path, full_class_name):
   from_path = str(from_path)
   full_class_name = str(full_class_name)
-  kclass = import_class_from_path(from_path, full_class_name)
-  if kclass is None:
+  try:
+re

[GitHub] jerrypeng closed pull request #2850: adding Python function instance unit test

2018-10-26 Thread GitBox
jerrypeng closed pull request #2850: adding Python function instance unit test
URL: https://github.com/apache/pulsar/pull/2850
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/pulsar b/bin/pulsar
index 9cacfe7782..02a81a04ef 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -94,7 +94,7 @@ fi
 # find the python instance location
 if [ ! -f "${PY_INSTANCE_FILE}" ]; then
 # didn't find a released python instance, then search the built python 
instance
-
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
 if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
 echo "\nCouldn't find pulsar-functions python instance.";
 echo "Make sure you've run 'mvn package'\n";
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index 1a1339d8ab..39d12393b2 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -130,7 +130,7 @@ fi
 # find the python instance location
 if [ ! -f "${PY_INSTANCE_FILE}" ]; then
 # didn't find a released python instance, then search the built python 
instance
-
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
 if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
 PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
 else
diff --git a/distribution/server/src/assemble/bin.xml 
b/distribution/server/src/assemble/bin.xml
index c9ad893d56..d17d4ff39f 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -44,7 +44,7 @@
   ${basedir}/licenses
 
 
-  
${basedir}/../../pulsar-functions/runtime/target/python-instance
+  
${basedir}/../../pulsar-functions/instance/target/python-instance
   instances/python-instance
 
 
diff --git a/pulsar-client-cpp/run-unit-tests.sh 
b/pulsar-client-cpp/run-unit-tests.sh
index ed4b6b4a1c..cffe0023b7 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -88,6 +88,10 @@ if [ $RES -eq 0 ]; then
 python pulsar_test.py
 RES=$?
 
+echo " Running Python Function Instance unit tests"
+bash 
/pulsar/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
+RES=$?
+
 popd
 popd
 
diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index e206b111cb..1c8f3ab78e 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -103,4 +103,35 @@
 
   
 
+  
+
+  
+  
+org.apache.maven.plugins
+maven-antrun-plugin
+
+  
+compile
+
+  run
+
+
+  
+building python instance
+
+
+
+
+
+
+  
+
+  
+
+  
+
+  
+
 
diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh 
b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
new file mode 100644
index 00..9b33c24481
--- /dev/null
+++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# Make sure dependencies are installed
+pip install mock --user
+pip install protobuf --user
+
+CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
+PULSAR_HOME=$CUR_DIR/../../../../
+
+# run instance tests
+PULSAR_HOME=${PULSAR_HOME} 
PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance 
python -m unittest discover -v 
${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/test/python/test_python_instanc

[GitHub] jerrypeng closed pull request #2847: Cleanup Logging for python functions

2018-10-26 Thread GitBox
jerrypeng closed pull request #2847: Cleanup Logging for python functions
URL: https://github.com/apache/pulsar/pull/2847
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 03dafaef2f..5f1645dd74 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -187,7 +187,7 @@ def run(self):
   else:
 serde_kclass = util.import_class(os.path.dirname(self.user_code), 
serde)
   self.input_serdes[topic] = serde_kclass()
-  Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
+  Log.debug("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
   self.consumers[topic] = self.pulsar_client.subscribe(
 str(topic), subscription_name,
 consumer_type=mode,
@@ -201,7 +201,7 @@ def run(self):
   else:
 serde_kclass = util.import_class(os.path.dirname(self.user_code), 
consumer_conf.serdeClassName)
   self.input_serdes[topic] = serde_kclass()
-  Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
+  Log.debug("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
   if consumer_conf.isRegexPattern:
 self.consumers[topic] = self.pulsar_client.subscribe(
   re.compile(str(topic)), subscription_name,
@@ -237,7 +237,7 @@ def run(self):
   Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
 
   def actual_execution(self):
-Log.info("Started Thread for executing the function")
+Log.debug("Started Thread for executing the function")
 while True:
   msg = self.queue.get(True)
   if isinstance(msg, InternalQuitMessage):
@@ -321,7 +321,7 @@ def setup_output_serde(self):
   def setup_producer(self):
 if self.instance_config.function_details.sink.topic != None and \
 len(self.instance_config.function_details.sink.topic) > 0:
-  Log.info("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
+  Log.debug("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
   self.producer = self.pulsar_client.create_producer(
 str(self.instance_config.function_details.sink.topic),
 block_if_queue_full=True,
diff --git a/pulsar-functions/instance/src/main/python/server.py 
b/pulsar-functions/instance/src/main/python/server.py
index 611a737031..58d43d204d 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -35,20 +35,20 @@ def __init__(self, pyinstance):
 self.pyinstance = pyinstance
 
   def GetFunctionStatus(self, request, context):
-Log.info("Came in GetFunctionStatus")
+Log.debug("Came in GetFunctionStatus")
 return self.pyinstance.get_function_status()
 
   def GetAndResetMetrics(self, request, context):
-Log.info("Came in GetAndResetMetrics")
+Log.debug("Came in GetAndResetMetrics")
 return self.pyinstance.get_and_reset_metrics()
 
   def ResetMetrics(self, request, context):
-Log.info("Came in ResetMetrics")
+Log.debug("Came in ResetMetrics")
 self.pyinstance.reset_metrics()
 return request
 
   def GetMetrics(self, request, context):
-Log.info("Came in GetMetrics")
+Log.debug("Came in GetMetrics")
 return self.pyinstance.get_metrics()
 
   def HealthCheck(self, request, context):
diff --git a/pulsar-functions/instance/src/main/python/util.py 
b/pulsar-functions/instance/src/main/python/util.py
index 4736457bbc..56c1ce1c9f 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -36,38 +36,33 @@
 def import_class(from_path, full_class_name):
   from_path = str(from_path)
   full_class_name = str(full_class_name)
-  kclass = import_class_from_path(from_path, full_class_name)
-  if kclass is None:
+  try:
+return import_class_from_path(from_path, full_class_name)
+  except Exception as e:
 our_dir = 
os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe(
 api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
-kclass = import_class_from_path(api_dir, full_class_name)
-  return kclass
+try:
+  return import_class_from_path(api_dir, full_class_name)
+except Exception as e:
+  Log.info("Failed to import class %s from path %s" % (full_class_name, 
from_path))
+  Log.info(e, exc_info=True)
+  return None
 
 def import_class_from_path(from_path, full_

[GitHub] srkukarni opened a new pull request #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni opened a new pull request #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853
 
 
   ### Motivation
   
   This pr defines the FunctionConfig part of secrets implementation. It 
defines how users can define secrets in their function/source/sink configs and 
how they get translated to FunctionDetails.
   Follow up prs to actually implement secrets will follow this one.
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2853: Secrets Frontend

2018-10-26 Thread GitBox
srkukarni commented on issue #2853: Secrets Frontend
URL: https://github.com/apache/pulsar/pull/2853#issuecomment-433473480
 
 
   @cckellogg Please take a look. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2826: Secrets Interface

2018-10-26 Thread GitBox
srkukarni commented on issue #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433470126
 
 
   I will break the change into smaller changes and it might be best to comment 
on each individual pr


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mujimu commented on issue #1070: Provide a nodejs client

2018-10-26 Thread GitBox
mujimu commented on issue #1070: Provide a nodejs client 
URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433457173
 
 
   more recent example of wrapping an underlying C++ lib as a node library:
   https://github.com/Blizzard/node-rdkafka
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mujimu commented on issue #1070: Provide a nodejs client

2018-10-26 Thread GitBox
mujimu commented on issue #1070: Provide a nodejs client 
URL: https://github.com/apache/pulsar/issues/1070#issuecomment-433455906
 
 
   bump


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface

2018-10-26 Thread GitBox
cckellogg commented on a change in pull request #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#discussion_r228576463
 
 

 ##
 File path: pulsar-functions/instance/src/main/python/secretsprovider.py
 ##
 @@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""secretsprovider.py: Interfaces and definitions for Secret Providers
+"""
+from abc import abstractmethod
+import os
+
+class SecretsProvider:
 
 Review comment:
   I think this ok now i think about it more. More implementations can be added 
later


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cckellogg commented on a change in pull request #2826: Secrets Interface

2018-10-26 Thread GitBox
cckellogg commented on a change in pull request #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#discussion_r228569533
 
 

 ##
 File path: pulsar-functions/instance/src/main/python/secretsprovider.py
 ##
 @@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""secretsprovider.py: Interfaces and definitions for Secret Providers
+"""
+from abc import abstractmethod
+import os
+
+class SecretsProvider:
 
 Review comment:
   I think the implementation for these should be provider specific. Like 
KuberberneteSecretProvider and that provider know how to read the secret in the 
container. Same for java.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating

2018-10-26 Thread GitBox
rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 
in 2.1.1-incubating
URL: https://github.com/apache/pulsar/issues/2795#issuecomment-432384264
 
 
   I though this was a bug but it turns out that it was an issue with how to 
enable authorization. I can't remember exactly where but the documentation was 
a bit confusing on which properties to enable for authentication vs. 
authorization. You will get this error if the following property is not set in 
your broker.conf files
   
   ```sh
   # Configuration to enable Authorization
   authorizationEnabled=true
   ```
   Once you bounce the broker with this update, the broker log file should 
contain a line similar to:
   ```sh
   [main] INFO  org.apache.pulsar.broker.authorization.AuthorizationService - 
org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider has been 
loaded.
   ```
   Then you can run your command again and it should succeed:
   ```sh
   ./pulsar-admin namespaces grant-permission public/pulsar-gke/default 
--actions produce,consume --role oisaac
   ```
   I am using v2.1.1-incubating and it works like a champ.
   
   I hope this helps!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 in 2.1.1-incubating

2018-10-26 Thread GitBox
rsamo edited a comment on issue #2795: namespace grant-permission Returns 500 
in 2.1.1-incubating
URL: https://github.com/apache/pulsar/issues/2795#issuecomment-432384264
 
 
   I though this was a bug but it turns out that it was an issue with how to 
enable authorization. I can't remember exactly where but the documentation was 
a bit confusing in which properties to enable for authentication vs. 
authorization. You will get this error if the following property is not set in 
your broker.conf files
   
   ```sh
   # Configuration to enable Authorization
   authorizationEnabled=true
   ```
   Once you bounce the broker with this update, the broker log file should 
contain a line similar to:
   ```sh
   [main] INFO  org.apache.pulsar.broker.authorization.AuthorizationService - 
org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider has been 
loaded.
   ```
   
   I am using v2.1.1-incubating and it works like a champ.
   
   I hope this helps!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #2852: Add connection timeout client configuration option

2018-10-26 Thread GitBox
ivankelly commented on issue #2852: Add connection timeout client configuration 
option
URL: https://github.com/apache/pulsar/pull/2852#issuecomment-433399375
 
 
   @cckellogg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #2852: Add connection timeout client configuration option

2018-10-26 Thread GitBox
ivankelly opened a new pull request #2852: Add connection timeout client 
configuration option
URL: https://github.com/apache/pulsar/pull/2852
 
 
   Allows the client to specify how long to wait for brokers to respond.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2826: Secrets Interface

2018-10-26 Thread GitBox
srkukarni commented on issue #2826: Secrets Interface
URL: https://github.com/apache/pulsar/pull/2826#issuecomment-433365590
 
 
   @merlimat @cckellogg I've made the changes as requested. Please take a look 
again


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2851: Use the right field to get the filename

2018-10-26 Thread GitBox
srkukarni closed pull request #2851: Use the right field to get the filename
URL: https://github.com/apache/pulsar/pull/2851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 087238c6af..89e3b48620 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -243,8 +243,8 @@ public Response registerFunction(final String tenant, final 
String namespace, fi
 
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
file, worker().getDlogNamespace());
 } else {
 
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName,
-fileDetail.getName()));
-
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName());
+fileDetail.getFileName()));
+
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
 log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
 
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
uploadedInputStreamAsFile, worker().getDlogNamespace());
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: Use the right field to get the filename (#2851)

2018-10-26 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 138c863  Use the right field to get the filename (#2851)
138c863 is described below

commit 138c863af0a43bd3423a7f3fb3846d2cb8dfe472
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 02:57:03 2018 -0700

Use the right field to get the filename (#2851)
---
 .../org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 087238c..89e3b48 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -243,8 +243,8 @@ public class FunctionsImpl {
 
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
file, worker().getDlogNamespace());
 } else {
 
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName,
-fileDetail.getName()));
-
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName());
+fileDetail.getFileName()));
+
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
 log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
 
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
uploadedInputStreamAsFile, worker().getDlogNamespace());
 }



[pulsar] branch master updated: Make IdentitySerde handle bytes as well (#2848)

2018-10-26 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b867907  Make IdentitySerde handle bytes as well (#2848)
b867907 is described below

commit b8679078dc6632795c4d81cff9caedd316444f17
Author: Sanjeev Kulkarni 
AuthorDate: Fri Oct 26 02:56:45 2018 -0700

Make IdentitySerde handle bytes as well (#2848)

* Make IdentitySerde handle bytes as well

* Added more information about type of object passed
---
 pulsar-client-cpp/python/pulsar/functions/serde.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py 
b/pulsar-client-cpp/python/pulsar/functions/serde.py
index ee6366c..968c1c9 100644
--- a/pulsar-client-cpp/python/pulsar/functions/serde.py
+++ b/pulsar-client-cpp/python/pulsar/functions/serde.py
@@ -75,7 +75,9 @@ class IdentitySerDe(SerDe):
   def serialize(self, input):
 if type(input) in self._types:
   return str(input).encode('utf-8')
-raise TypeError
+if type(input) == bytes:
+  return input
+raise TypeError("IdentitySerde cannot serialize object of type %s" % 
type(input))
 
   def deserialize(self, input_bytes):
 for typ in self._types:
@@ -83,4 +85,4 @@ class IdentitySerDe(SerDe):
 return typ(input_bytes.decode('utf-8'))
   except:
 pass
-raise TypeError
+return input_bytes



[GitHub] srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well

2018-10-26 Thread GitBox
srkukarni closed pull request #2848: Make IdentitySerde handle bytes as well
URL: https://github.com/apache/pulsar/pull/2848
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py 
b/pulsar-client-cpp/python/pulsar/functions/serde.py
index ee6366cf63..968c1c9bc3 100644
--- a/pulsar-client-cpp/python/pulsar/functions/serde.py
+++ b/pulsar-client-cpp/python/pulsar/functions/serde.py
@@ -75,7 +75,9 @@ def __init__(self):
   def serialize(self, input):
 if type(input) in self._types:
   return str(input).encode('utf-8')
-raise TypeError
+if type(input) == bytes:
+  return input
+raise TypeError("IdentitySerde cannot serialize object of type %s" % 
type(input))
 
   def deserialize(self, input_bytes):
 for typ in self._types:
@@ -83,4 +85,4 @@ def deserialize(self, input_bytes):
 return typ(input_bytes.decode('utf-8'))
   except:
 pass
-raise TypeError
+return input_bytes


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics

2018-10-26 Thread GitBox
maskit commented on a change in pull request #2829: Fix Websocket Consume 
Messages in Partitioned Topics
URL: https://github.com/apache/pulsar/pull/2829#discussion_r228456096
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
 ##
 @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws 
IOException {
 return messageId;
 }
 
+public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+checkNotNull(data);
+ByteBufCodedInputStream inputStream = 
ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
+PulsarApi.MessageIdData.Builder builder = 
PulsarApi.MessageIdData.newBuilder();
+
+PulsarApi.MessageIdData idData;
+try {
+idData = builder.mergeFrom(inputStream, null).build();
+} catch (UninitializedMessageException e) {
+throw new IOException(e);
+}
+
+MessageId messageId;
+if (idData.hasBatchIndex()) {
+messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
+idData.getBatchIndex());
+} else {
+messageId = new MessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition());
+}
+if (idData.getPartition() > -1 && topicName != null) {
 
 Review comment:
   Oh, we need to pass it to the constructor... sounds odd though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics

2018-10-26 Thread GitBox
maskit commented on a change in pull request #2829: Fix Websocket Consume 
Messages in Partitioned Topics
URL: https://github.com/apache/pulsar/pull/2829#discussion_r228455442
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
 ##
 @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws 
IOException {
 return messageId;
 }
 
+public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+checkNotNull(data);
+ByteBufCodedInputStream inputStream = 
ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
+PulsarApi.MessageIdData.Builder builder = 
PulsarApi.MessageIdData.newBuilder();
+
+PulsarApi.MessageIdData idData;
+try {
+idData = builder.mergeFrom(inputStream, null).build();
+} catch (UninitializedMessageException e) {
+throw new IOException(e);
+}
+
+MessageId messageId;
+if (idData.hasBatchIndex()) {
+messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
+idData.getBatchIndex());
+} else {
+messageId = new MessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition());
+}
+if (idData.getPartition() > -1 && topicName != null) {
 
 Review comment:
   It seems like we can merge these two if statements.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on a change in pull request #2829: Fix Websocket Consume Messages in Partitioned Topics

2018-10-26 Thread GitBox
maskit commented on a change in pull request #2829: Fix Websocket Consume 
Messages in Partitioned Topics
URL: https://github.com/apache/pulsar/pull/2829#discussion_r228454245
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 ##
 @@ -31,7 +31,7 @@
 private final String topicName;
 private final MessageId messageId;
 
-TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId 
messageId) {
+public TopicMessageIdImpl(String topicPartitionName, String topicName, 
MessageId messageId) {
 
 Review comment:
   Is this public still needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on issue #2814: Fix memory issue in cpp ZTSClient

2018-10-26 Thread GitBox
maskit commented on issue #2814: Fix memory issue in cpp ZTSClient
URL: https://github.com/apache/pulsar/pull/2814#issuecomment-47349
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2851: Use the right field to get the filename

2018-10-26 Thread GitBox
srkukarni commented on issue #2851: Use the right field to get the filename
URL: https://github.com/apache/pulsar/pull/2851#issuecomment-41679
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well

2018-10-26 Thread GitBox
srkukarni commented on issue #2848: Make IdentitySerde handle bytes as well
URL: https://github.com/apache/pulsar/pull/2848#issuecomment-41483
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services