[GitHub] [pulsar] pouledodue opened a new issue #4937: Retrieve schema instance from schema info for type 'NONE'

2019-08-11 Thread GitBox
pouledodue opened a new issue #4937: Retrieve schema instance from schema info 
for type 'NONE' 
URL: https://github.com/apache/pulsar/issues/4937
 
 
   ```
   pulsar-admin functions trigger --fqfn test/app/func1 --trigger-value yoshi
   
   java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException:
Retrieve schema instance from schema info for type 'NONE' is not supported 
yet
   
   Reason: HTTP 500 Internal Server Error
   ```
   
   What am I doing wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] fxbing commented on issue #4933: Backlog size calculation errors can result in unlimited messages being produced

2019-08-11 Thread GitBox
fxbing commented on issue #4933: Backlog size calculation errors can result in 
unlimited messages being produced 
URL: https://github.com/apache/pulsar/issues/4933#issuecomment-520293734
 
 
   fixed in #4936 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] fxbing opened a new pull request #4936: Fix backlog size bug

2019-08-11 Thread GitBox
fxbing opened a new pull request #4936: Fix backlog size bug
URL: https://github.com/apache/pulsar/pull/4936
 
 
   fix #4933 
   Backlogs are sets of unacknowledged messages for a topic that have been 
stored by bookies even if not consumed. But in fact, if the topic has not been 
consumed, you can produce unlimited messages. So `getEstimatedBacklogSize()` 
should return `getTotalSize()` instead of 0 when `pos == null`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] zymap closed pull request #4925: [Transaction][Buffer] add transaction suffix on the topic name

2019-08-11 Thread GitBox
zymap closed pull request #4925: [Transaction][Buffer] add transaction suffix 
on the topic name
URL: https://github.com/apache/pulsar/pull/4925
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] zymap commented on issue #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion`

2019-08-11 Thread GitBox
zymap commented on issue #4935: [WIP][Transaction][Buffer] handle command 
`EndTxnOnPartitiion`
URL: https://github.com/apache/pulsar/pull/4935#issuecomment-520292790
 
 
   @sijie Please help me take a look when you have time. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet commented on issue #4789: [Doc] Add Schema Chapter

2019-08-11 Thread GitBox
Anonymitaet commented on issue #4789: [Doc] Add Schema Chapter
URL: https://github.com/apache/pulsar/issues/4789#issuecomment-520292037
 
 
   All schema related PRs have been merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet closed issue #4789: [Doc] Add Schema Chapter

2019-08-11 Thread GitBox
Anonymitaet closed issue #4789: [Doc] Add Schema Chapter
URL: https://github.com/apache/pulsar/issues/4789
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet edited a comment on issue #4905: [Doc] Fix display issue of swagger doc

2019-08-11 Thread GitBox
Anonymitaet edited a comment on issue #4905: [Doc] Fix display issue of swagger 
doc
URL: https://github.com/apache/pulsar/issues/4905#issuecomment-520291885
 
 
   Fixed in https://github.com/apache/pulsar/pull/4901


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet commented on issue #4905: [Doc] Fix display issue of swagger doc

2019-08-11 Thread GitBox
Anonymitaet commented on issue #4905: [Doc] Fix display issue of swagger doc
URL: https://github.com/apache/pulsar/issues/4905#issuecomment-520291885
 
 
   Fixed in https://github.com/apache/pulsar/pull/4901#event-2548070643


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] zymap closed pull request #4923: [Transaction][buffer] create a commit marker at topic ledger

2019-08-11 Thread GitBox
zymap closed pull request #4923: [Transaction][buffer] create a commit marker 
at topic ledger
URL: https://github.com/apache/pulsar/pull/4923
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet closed issue #4905: [Doc] Fix display issue of swagger doc

2019-08-11 Thread GitBox
Anonymitaet closed issue #4905: [Doc] Fix display issue of swagger doc
URL: https://github.com/apache/pulsar/issues/4905
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] zymap opened a new pull request #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion`

2019-08-11 Thread GitBox
zymap opened a new pull request #4935: [WIP][Transaction][Buffer] handle 
command `EndTxnOnPartitiion`
URL: https://github.com/apache/pulsar/pull/4935
 
 
   
   ### Motivation
   
   *Add handler for the command `EndTxnOnPartitioin*
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet commented on issue #4934: [Doc] Update Schema Guide

2019-08-11 Thread GitBox
Anonymitaet commented on issue #4934: [Doc] Update Schema Guide
URL: https://github.com/apache/pulsar/pull/4934#issuecomment-520291380
 
 
   @sijie could you please help review? Thank you


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] Anonymitaet opened a new pull request #4934: [Doc] Update Schema Guide

2019-08-11 Thread GitBox
Anonymitaet opened a new pull request #4934: [Doc] Update Schema Guide
URL: https://github.com/apache/pulsar/pull/4934
 
 
   1. Add descriptions.
   
   2. Delete the following files because their contents are already in the new 
Schema Guide.
   
   
   - concepts-schema-registry.md (contents are moved 
[here1](https://pulsar.apache.org/docs/en/next/schema-get-started/) and 
[here2](https://pulsar.apache.org/docs/en/next/schema-understand/#schema-version))
   - developing-schema.md (contents are moved 
[here](https://pulsar.apache.org/docs/en/next/schema-manage/))
   - admin-api-schemas.md (contents are moved 
[here](https://pulsar.apache.org/docs/en/next/schema-manage/))


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] fxbing opened a new issue #4933: Backlog size calculation errors can result in unlimited messages being produced

2019-08-11 Thread GitBox
fxbing opened a new issue #4933: Backlog size calculation errors can result in 
unlimited messages being produced 
URL: https://github.com/apache/pulsar/issues/4933
 
 
   **Describe the bug**
   Backlogs are sets of unacknowledged messages for a topic that have been 
stored by bookies even if not consumed. But in fact, if the topic has not been 
consumed, you can produce unlimited messages.
   
https://github.com/apache/pulsar/blob/be7b24f9f8aa67b2235e523485249aef8d2a611a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L921-L926
   
   
   **To Reproduce**
   Set a limited BaclogQuotaSize to produce unlimited messages without 
consuming.
   
   **Expected behavior**
   Limited production message
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version

2019-08-11 Thread GitBox
congbobo184 commented on a change in pull request #4877: Add schema admin api  
get schema info with schema version
URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772646
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java
 ##
 @@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class KeyValueSchemaInfoData {
 
 Review comment:
   > @congbobo any reason creating a new class? Can't you just 
`KeyValue`?
   yes, there should use keyValue, and then i will change it 
and add the test for it .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version

2019-08-11 Thread GitBox
congbobo184 commented on a change in pull request #4877: Add schema admin api  
get schema info with schema version
URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772719
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java
 ##
 @@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class KeyValueSchemaInfoData {
 
 Review comment:
   > @congbobo any reason creating a new class? Can't you just 
`KeyValue`?
   
   yes, there should use keyValue, and then i will change it 
and add the test for it .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version

2019-08-11 Thread GitBox
congbobo184 commented on a change in pull request #4877: Add schema admin api  
get schema info with schema version
URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772646
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java
 ##
 @@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class KeyValueSchemaInfoData {
 
 Review comment:
   > @congbobo any reason creating a new class? Can't you just 
`KeyValue`?
   yes, there should use keyValue, and then i will change it 
and add the test for it .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] zymap commented on a change in pull request #4927: [Transaction][Buffer] Provide an asynchronous method to create a transaction buffer

2019-08-11 Thread GitBox
zymap commented on a change in pull request #4927: [Transaction][Buffer] 
Provide an asynchronous method to create a transaction buffer
URL: https://github.com/apache/pulsar/pull/4927#discussion_r312772433
 
 

 ##
 File path: 
pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferProvider.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.pulsar.transaction.buffer.impl;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.Setter;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.transaction.buffer.TransactionBufferProvider;
+
+public class PersistentTransactionBufferProvider implements 
TransactionBufferProvider {
+
+private final BrokerService brokerService;
+private final String topic;
+private final String txnTopic;
+
+public PersistentTransactionBufferProvider(BrokerService service, String 
topic) {
+this.brokerService = service;
+// TODO: get the transaction topic name by the 
TopicName.getPersistentNamingEncoding(isTxn)
+this.txnTopic = TopicName.get(topic).getPersistenceNamingEncoding() + 
"/_txnlog";
+this.topic = topic;
+}
+
+@Override
+public CompletableFuture newTransactionBuffer() {
+CompletableFuture newBufferFuture = new 
CompletableFuture<>();
+
brokerService.getManagedLedgerConfig(TopicName.get(topic)).whenComplete((config,
 err) -> {
 
 Review comment:
   The config can change before opening the ledger. We need to set the 
`createIfMissing` as true.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java 
client.
URL: https://github.com/apache/pulsar/pull/4621#issuecomment-520282807
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie merged pull request #4921: [4920][proxy] Add option to disable authentication for proxy /metrics

2019-08-11 Thread GitBox
sijie merged pull request #4921: [4920][proxy] Add option to disable 
authentication for proxy /metrics
URL: https://github.com/apache/pulsar/pull/4921
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie closed issue #4920: `/metrics` endpoint in proxy is behind authentication

2019-08-11 Thread GitBox
sijie closed issue #4920: `/metrics` endpoint in proxy is behind authentication 
URL: https://github.com/apache/pulsar/issues/4920
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: Add option to disable authentication for proxy /metrics (#4921)

2019-08-11 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new be7b24f  Add option to disable authentication for proxy /metrics 
(#4921)
be7b24f is described below

commit be7b24f9f8aa67b2235e523485249aef8d2a611a
Author: Addison Higham 
AuthorDate: Sun Aug 11 20:01:53 2019 -0600

Add option to disable authentication for proxy /metrics (#4921)

This commit adds a new option optionally disable authentication for the
`/metrics` endpoint in the pulsar-proxy.

Currently, authentication is required for the metrics endpoint when
authentication is enabled, which makes monitoring more difficult.
However, rather than just disable it completely and allow for metrics to
be exposed to any unknown user, this makes it opt in.

It could be argued that it should default to false, but as it is likely
that the proxy is the only component potentially exposed to the public 
internet, we
default to not exposing data.

Fixes #4920
---
 .../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++
 .../java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java| 3 ++-
 .../src/main/java/org/apache/pulsar/proxy/server/WebServer.java | 6 +-
 site2/docs/reference-configuration.md   | 1 +
 4 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index c0f7096..6a293a0 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -197,6 +197,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
 + "to take effect"
 )
 private boolean forwardAuthorizationCredentials = false;
+@FieldContext(
+category = CATEGORY_AUTHENTICATION,
+doc = "Whether the '/metrics' endpoint requires authentication. 
Defaults to true."
++ "'authenticationEnabled' must also be set for this to take 
effect."
+)
+private boolean authenticateMetricsEndpoint = true;
 
 
 @FieldContext(
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 3e48c01..5a563e5 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.configuration.VipStatus;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
 
 
@@ -174,7 +175,7 @@ public class ProxyServiceStarter {
 static void addWebServerHandlers(WebServer server,
  ProxyConfiguration config,
  BrokerDiscoveryProvider 
discoveryProvider) {
-server.addServlet("/metrics", new ServletHolder(MetricsServlet.class));
+server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), 
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
 server.addRestResources("/", VipStatus.class.getPackage().getName(),
 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
config.getStatusFilePath());
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 2c4a4c2..b4ebe35 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -127,6 +127,10 @@ public class WebServer {
 }
 
 public void addServlet(String basePath, ServletHolder servletHolder, 
List> attributes) {
+addServlet(basePath, servletHolder, attributes, true);
+}
+
+public void addServlet(String basePath, ServletHolder servletHolder, 
List> attributes, boolean requireAuthentication) {
 Optional existingPath = servletPaths.stream().filter(p -> 
p.startsWith(basePath)).findFirst();
 if (existingPath.isPresent()) {
 throw new IllegalArgumentException(
@@ -140,7 +144,7 @@ public class WebServer {
 for (Pair attribute : attributes) {
 context.setAttribute(attribute.getLeft(), attribute.getRight());
 }
-if (config.isAuthenticationEnabled()) {
+if (config.isAuthenticationEnabled() && requireAuthentication) {
 FilterHolder filter = new FilterHolder(new 
AuthenticationFilter(authenticationService));
 

[GitHub] [pulsar] sijie commented on issue #4910: [doc] Add a documentation page for metrics reference

2019-08-11 Thread GitBox
sijie commented on issue #4910: [doc] Add a documentation page for metrics 
reference
URL: https://github.com/apache/pulsar/pull/4910#issuecomment-520281659
 
 
   run cpp tests
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: [pulsar-client] Fix broken replication msg to specific cluster (#4930)

2019-08-11 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 02f9fd3  [pulsar-client] Fix broken replication msg to specific 
cluster (#4930)
02f9fd3 is described below

commit 02f9fd3e055f404501e41413ec1081331ff945c3
Author: Rajan Dhabalia 
AuthorDate: Sun Aug 11 18:41:19 2019 -0700

[pulsar-client] Fix broken replication msg to specific cluster (#4930)
---
 .../pulsar/broker/service/ReplicatorTest.java  | 39 ++
 .../apache/pulsar/common/protocol/Commands.java|  3 ++
 2 files changed, 42 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index eddb7cd..f68214a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
 }
 
+@Test
+public void testReplicatedCluster() throws Exception {
+
+log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
+
+final String namespace = "pulsar/global/repl";
+final String topicName = String.format("persistent://%s/topic1", 
namespace);
+admin1.namespaces().createNamespace(namespace);
+admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+admin1.topics().createPartitionedTopic(topicName, 4);
+
+PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+.build();
+PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+.build();
+
+Producer producer1 = 
client1.newProducer().topic(topicName).create();
+org.apache.pulsar.client.api.Consumer consumer1 = 
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+org.apache.pulsar.client.api.Consumer consumer2 = 
client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+byte[] value = "test".getBytes();
+
+// publish message local only
+TypedMessageBuilder msg = 
producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
+msg.send();
+assertEquals(consumer1.receive().getValue(), value);
+
+Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+if (msg2 != null) {
+fail("msg should have not been replicated to remote cluster");
+}
+
+consumer1.close();
+consumer2.close();
+producer1.close();
+
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7d9de6f..70ac8e4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1420,6 +1420,9 @@ public class Commands {
 if (builder.hasReplicatedFrom()) {
 messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
 }
+if (builder.getReplicateToCount() > 0) {
+messageMetadata.addAllReplicateTo(builder.getReplicateToList());
+}
 if (builder.hasSchemaVersion()) {
 messageMetadata.setSchemaVersion(builder.getSchemaVersion());
 }



[GitHub] [pulsar] sijie merged pull request #4930: [pulsar-client] Fix broken replication msg to specific cluster

2019-08-11 Thread GitBox
sijie merged pull request #4930: [pulsar-client] Fix broken replication msg to 
specific cluster
URL: https://github.com/apache/pulsar/pull/4930
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java 
client.
URL: https://github.com/apache/pulsar/pull/4621#issuecomment-520232917
 
 
   @sijie Thanks for the review, i have addressed your comments, please take a 
look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740632
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
 ##
 @@ -80,6 +91,11 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 this.pendingReceives = Queues.newConcurrentLinkedQueue();
 this.schema = schema;
 this.interceptors = interceptors;
+this.batchReceivePolicy = conf.getBatchReceivePolicy();
+this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740422
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 ##
 @@ -309,6 +307,36 @@ private void messageReceived(ConsumerImpl consumer, 
Message message) {
 }
 }
 
+void notifyPendingBatchReceivedCallBack() {
+OpBatchReceive opBatchReceive = pendingBatchReceives.poll();
+if (opBatchReceive == null || opBatchReceive.future == null) {
+return;
+}
+notifyPendingBatchReceivedCallBack(opBatchReceive);
+}
+
+void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) {
+MessagesImpl messages = new 
MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
+batchReceivePolicy.getMaxNumBytes());
+Message msgPeeked = incomingMessages.peek();
 
 Review comment:
   I have added a method getReUseableMessagesImpl() to avoid create lots of 
MessagesImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740338
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -369,6 +370,52 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
 }
 }
 
+@Override
+protected Messages internalBatchReceive() throws PulsarClientException {
+try {
+return internalBatchReceiveAsync().get();
+} catch (InterruptedException | ExecutionException e) {
+State state = getState();
+if (state != State.Closing && state != State.Closed) {
+stats.incrementNumReceiveFailed();
 
 Review comment:
   Yes, i will add metrics for batch receive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740347
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 ##
 @@ -309,6 +307,36 @@ private void messageReceived(ConsumerImpl consumer, 
Message message) {
 }
 }
 
+void notifyPendingBatchReceivedCallBack() {
+OpBatchReceive opBatchReceive = pendingBatchReceives.poll();
+if (opBatchReceive == null || opBatchReceive.future == null) {
+return;
+}
+notifyPendingBatchReceivedCallBack(opBatchReceive);
+}
+
+void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) {
+MessagesImpl messages = new 
MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
+batchReceivePolicy.getMaxNumBytes());
+Message msgPeeked = incomingMessages.peek();
+while (msgPeeked != null && messages.canAdd(msgPeeked)) {
+Message msg = null;
+try {
+msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
+} catch (InterruptedException e) {
+// ignore
 
 Review comment:
   yes fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740282
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} 
{@link Consumer#batchReceiveAsync()}.
+ *
+ * Batch receive policy can limit the number and bytes of messages in a single 
batch, and can specify a timeout
+ * for waiting for enough messages for this batch.
+ *
+ * This batch receive will be completed as long as any one of the
+ * conditions(has enough number of messages, has enough of size of messages, 
wait timeout) is met.
+ *
+ * Examples:
+ *
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, 
it
+ * means {@link Consumer#batchReceive()} will always wait until there is 
enough messages.
+ *
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not 
there is enough messages.
+ *
+ * Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait 
timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+public class BatchReceivePolicy {
+
+/**
+ * Default batch receive policy
+ *
+ * Max number of messages: 100
+ * Max number of bytes: 10MB
+ * Timeout: 100ms
+ */
+public static final BatchReceivePolicy DEFAULT_POLICY = new 
BatchReceivePolicy(
+100, 1024 * 1024 * 10, 100, TimeUnit.MILLISECONDS);
+
+private BatchReceivePolicy(int maxNumMessages, long maxNumBytes, int 
timeout, TimeUnit timeoutUnit) {
+this.maxNumMessages = maxNumMessages;
+this.maxNumBytes = maxNumBytes;
+this.timeout = timeout;
+this.timeoutUnit = timeoutUnit;
+}
+
+/**
+ * Max number of messages for a single batch receive, 0 or negative means 
no limit.
+ */
+private int maxNumMessages;
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740300
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} 
{@link Consumer#batchReceiveAsync()}.
+ *
+ * Batch receive policy can limit the number and bytes of messages in a single 
batch, and can specify a timeout
+ * for waiting for enough messages for this batch.
+ *
+ * This batch receive will be completed as long as any one of the
+ * conditions(has enough number of messages, has enough of size of messages, 
wait timeout) is met.
+ *
+ * Examples:
+ *
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, 
it
+ * means {@link Consumer#batchReceive()} will always wait until there is 
enough messages.
+ *
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not 
there is enough messages.
+ *
+ * Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait 
timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+public class BatchReceivePolicy {
+
+/**
+ * Default batch receive policy
+ *
+ * Max number of messages: 100
+ * Max number of bytes: 10MB
+ * Timeout: 100ms
+ */
+public static final BatchReceivePolicy DEFAULT_POLICY = new 
BatchReceivePolicy(
+100, 1024 * 1024 * 10, 100, TimeUnit.MILLISECONDS);
+
+private BatchReceivePolicy(int maxNumMessages, long maxNumBytes, int 
timeout, TimeUnit timeoutUnit) {
+this.maxNumMessages = maxNumMessages;
+this.maxNumBytes = maxNumBytes;
+this.timeout = timeout;
+this.timeoutUnit = timeoutUnit;
+}
+
+/**
+ * Max number of messages for a single batch receive, 0 or negative means 
no limit.
+ */
+private int maxNumMessages;
+
+/**
+ * Max bytes of messages for a single batch receive, 0 or negative means 
no limit.
+ */
+private long maxNumBytes;
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740293
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Messages.java
 ##
 @@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+
+/**
+ * A container that holds the list {@link Message} for a topic.
+ * @param 
+ */
+public interface Messages extends Iterable> {
+
+/**
+ * Get the list {@link Message}
+ */
+List> getMessageList();
 
 Review comment:
   remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740260
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
 ##
 @@ -148,25 +139,53 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 "Cannot use receive() when a listener has been set");
 }
 
-switch (getState()) {
-case Ready:
-case Connecting:
-break; // Ok
-case Closing:
-case Closed:
-throw new PulsarClientException.AlreadyClosedException("Consumer 
already closed");
-case Terminated:
-throw new PulsarClientException.AlreadyClosedException("Topic was 
terminated");
-case Failed:
-case Uninitialized:
-throw new PulsarClientException.NotConnectedException();
+PulsarClientException exception = verifyConsumerState();
+if (exception != null) {
+throw exception;
 }
-
 return internalReceive(timeout, unit);
 }
 
 abstract protected Message internalReceive(int timeout, TimeUnit unit) 
throws PulsarClientException;
 
+@Override
+public Messages batchReceive() throws PulsarClientException {
+if (listener != null) {
 
 Review comment:
   I have added a new method for verify batch receive, because batchReceive() 
need to handle  InterruptedException and ExecutionException and then unwrap 
them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740276
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} 
{@link Consumer#batchReceiveAsync()}.
+ *
+ * Batch receive policy can limit the number and bytes of messages in a single 
batch, and can specify a timeout
+ * for waiting for enough messages for this batch.
+ *
+ * This batch receive will be completed as long as any one of the
+ * conditions(has enough number of messages, has enough of size of messages, 
wait timeout) is met.
+ *
+ * Examples:
+ *
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, 
it
+ * means {@link Consumer#batchReceive()} will always wait until there is 
enough messages.
+ *
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not 
there is enough messages.
+ *
+ * Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait 
timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+public class BatchReceivePolicy {
+
+/**
+ * Default batch receive policy
+ *
+ * Max number of messages: 100
+ * Max number of bytes: 10MB
+ * Timeout: 100ms
+ */
+public static final BatchReceivePolicy DEFAULT_POLICY = new 
BatchReceivePolicy(
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739935
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
 ##
 @@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MessagesImpl implements Messages {
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739722
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
 ##
 @@ -367,4 +410,98 @@ protected void onAckTimeoutSend(Set 
messageIds) {
 interceptors. onAckTimeoutSend(this, messageIds);
 }
 }
+
+protected boolean canEnqueueMessage(Message message) {
+// Default behavior, can be overridden in subclasses
+return true;
+}
+
+protected boolean enqueueMessageAndCheckBatchReceive(Message message) {
+if (canEnqueueMessage(message)) {
+incomingMessages.add(message);
+INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
message.getData().length);
+}
+return hasEnoughMessagesForBatchReceive();
+}
+
+protected boolean hasEnoughMessagesForBatchReceive() {
+if (batchReceivePolicy.getMaxNumMessages() <= 0 && 
batchReceivePolicy.getMaxNumMessages() <= 0) {
+return false;
+}
+return (batchReceivePolicy.getMaxNumMessages() > 0 && 
incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
+|| (batchReceivePolicy.getMaxNumBytes() > 0 && 
INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= 
batchReceivePolicy.getMaxNumBytes());
+}
+
+private PulsarClientException verifyConsumerState() {
 
 Review comment:
   fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739173
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
 ##
 @@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MessagesImpl implements Messages {
+
+private List> messageList;
+
+private final int maxNumberOfMessages;
+private final long maxSizeOfMessages;
+
+private int currentNumberOfMessages;
+private long currentSizeOfMessages;
+
+protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) {
+this.maxNumberOfMessages = maxNumberOfMessages;
+this.maxSizeOfMessages = maxSizeOfMessages;
+messageList = maxNumberOfMessages > 0 ? new 
ArrayList<>(maxNumberOfMessages) : new ArrayList<>();
+}
+
+protected boolean canAdd(Message message) {
+if (maxNumberOfMessages <= 0 && maxSizeOfMessages <= 0) {
+return true;
+}
+return (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 <= 
maxNumberOfMessages)
+|| (maxSizeOfMessages > 0 && currentSizeOfMessages + 
message.getData().length <= maxSizeOfMessages);
+}
+
+protected void add(Message message) {
+if (message == null) {
+return;
+}
+Preconditions.checkArgument(canAdd(message), "No more space to add 
messages.");
 
 Review comment:
   We need canAdd to do a pre-check, because we need add intercepted messages 
to Messages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312737760
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -369,6 +370,52 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
 }
 }
 
+@Override
+protected Messages internalBatchReceive() throws PulsarClientException {
+try {
+return internalBatchReceiveAsync().get();
+} catch (InterruptedException | ExecutionException e) {
+State state = getState();
+if (state != State.Closing && state != State.Closed) {
+stats.incrementNumReceiveFailed();
+throw PulsarClientException.unwrap(e);
+} else {
+return null;
+}
+}
+}
+
+@Override
+protected CompletableFuture> internalBatchReceiveAsync() {
+CompletableFuture> result = new CompletableFuture<>();
+try {
+lock.writeLock().lock();
 
 Review comment:
   Use read lock may cause Messages return early(have not reached capacity 
yet), so use write lock here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.

2019-08-11 Thread GitBox
codelipenghui commented on a change in pull request #4621: [PIP-38] Support 
batch receive in java client.
URL: https://github.com/apache/pulsar/pull/4621#discussion_r312737067
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 ##
 @@ -314,6 +315,14 @@ public ConsumerBuilderImpl(PulsarClientImpl client, 
Schema schema) {
 return this;
 }
 
+@Override
+public ConsumerBuilder batchReceivePolicy(BatchReceivePolicy 
batchReceivePolicy) {
+checkArgument(batchReceivePolicy != null, "batchReceivePolicy must not 
be null.");
 
 Review comment:
   I see all the validations are still in ConsumerBuilderImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services