[GitHub] [pulsar] pouledodue opened a new issue #4937: Retrieve schema instance from schema info for type 'NONE'
pouledodue opened a new issue #4937: Retrieve schema instance from schema info for type 'NONE' URL: https://github.com/apache/pulsar/issues/4937 ``` pulsar-admin functions trigger --fqfn test/app/func1 --trigger-value yoshi java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Retrieve schema instance from schema info for type 'NONE' is not supported yet Reason: HTTP 500 Internal Server Error ``` What am I doing wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] fxbing commented on issue #4933: Backlog size calculation errors can result in unlimited messages being produced
fxbing commented on issue #4933: Backlog size calculation errors can result in unlimited messages being produced URL: https://github.com/apache/pulsar/issues/4933#issuecomment-520293734 fixed in #4936 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] fxbing opened a new pull request #4936: Fix backlog size bug
fxbing opened a new pull request #4936: Fix backlog size bug URL: https://github.com/apache/pulsar/pull/4936 fix #4933 Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies even if not consumed. But in fact, if the topic has not been consumed, you can produce unlimited messages. So `getEstimatedBacklogSize()` should return `getTotalSize()` instead of 0 when `pos == null`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] zymap closed pull request #4925: [Transaction][Buffer] add transaction suffix on the topic name
zymap closed pull request #4925: [Transaction][Buffer] add transaction suffix on the topic name URL: https://github.com/apache/pulsar/pull/4925 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] zymap commented on issue #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion`
zymap commented on issue #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion` URL: https://github.com/apache/pulsar/pull/4935#issuecomment-520292790 @sijie Please help me take a look when you have time. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #4789: [Doc] Add Schema Chapter
Anonymitaet commented on issue #4789: [Doc] Add Schema Chapter URL: https://github.com/apache/pulsar/issues/4789#issuecomment-520292037 All schema related PRs have been merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet closed issue #4789: [Doc] Add Schema Chapter
Anonymitaet closed issue #4789: [Doc] Add Schema Chapter URL: https://github.com/apache/pulsar/issues/4789 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet edited a comment on issue #4905: [Doc] Fix display issue of swagger doc
Anonymitaet edited a comment on issue #4905: [Doc] Fix display issue of swagger doc URL: https://github.com/apache/pulsar/issues/4905#issuecomment-520291885 Fixed in https://github.com/apache/pulsar/pull/4901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #4905: [Doc] Fix display issue of swagger doc
Anonymitaet commented on issue #4905: [Doc] Fix display issue of swagger doc URL: https://github.com/apache/pulsar/issues/4905#issuecomment-520291885 Fixed in https://github.com/apache/pulsar/pull/4901#event-2548070643 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] zymap closed pull request #4923: [Transaction][buffer] create a commit marker at topic ledger
zymap closed pull request #4923: [Transaction][buffer] create a commit marker at topic ledger URL: https://github.com/apache/pulsar/pull/4923 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet closed issue #4905: [Doc] Fix display issue of swagger doc
Anonymitaet closed issue #4905: [Doc] Fix display issue of swagger doc URL: https://github.com/apache/pulsar/issues/4905 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] zymap opened a new pull request #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion`
zymap opened a new pull request #4935: [WIP][Transaction][Buffer] handle command `EndTxnOnPartitiion` URL: https://github.com/apache/pulsar/pull/4935 ### Motivation *Add handler for the command `EndTxnOnPartitioin* This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet commented on issue #4934: [Doc] Update Schema Guide
Anonymitaet commented on issue #4934: [Doc] Update Schema Guide URL: https://github.com/apache/pulsar/pull/4934#issuecomment-520291380 @sijie could you please help review? Thank you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] Anonymitaet opened a new pull request #4934: [Doc] Update Schema Guide
Anonymitaet opened a new pull request #4934: [Doc] Update Schema Guide URL: https://github.com/apache/pulsar/pull/4934 1. Add descriptions. 2. Delete the following files because their contents are already in the new Schema Guide. - concepts-schema-registry.md (contents are moved [here1](https://pulsar.apache.org/docs/en/next/schema-get-started/) and [here2](https://pulsar.apache.org/docs/en/next/schema-understand/#schema-version)) - developing-schema.md (contents are moved [here](https://pulsar.apache.org/docs/en/next/schema-manage/)) - admin-api-schemas.md (contents are moved [here](https://pulsar.apache.org/docs/en/next/schema-manage/)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] fxbing opened a new issue #4933: Backlog size calculation errors can result in unlimited messages being produced
fxbing opened a new issue #4933: Backlog size calculation errors can result in unlimited messages being produced URL: https://github.com/apache/pulsar/issues/4933 **Describe the bug** Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies even if not consumed. But in fact, if the topic has not been consumed, you can produce unlimited messages. https://github.com/apache/pulsar/blob/be7b24f9f8aa67b2235e523485249aef8d2a611a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L921-L926 **To Reproduce** Set a limited BaclogQuotaSize to produce unlimited messages without consuming. **Expected behavior** Limited production message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version
congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772646 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Accessors(chain = true) +@Builder +public class KeyValueSchemaInfoData { Review comment: > @congbobo any reason creating a new class? Can't you just `KeyValue`? yes, there should use keyValue, and then i will change it and add the test for it . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version
congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772719 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Accessors(chain = true) +@Builder +public class KeyValueSchemaInfoData { Review comment: > @congbobo any reason creating a new class? Can't you just `KeyValue`? yes, there should use keyValue, and then i will change it and add the test for it . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version
congbobo184 commented on a change in pull request #4877: Add schema admin api get schema info with schema version URL: https://github.com/apache/pulsar/pull/4877#discussion_r312772646 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoData.java ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Accessors(chain = true) +@Builder +public class KeyValueSchemaInfoData { Review comment: > @congbobo any reason creating a new class? Can't you just `KeyValue`? yes, there should use keyValue, and then i will change it and add the test for it . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] zymap commented on a change in pull request #4927: [Transaction][Buffer] Provide an asynchronous method to create a transaction buffer
zymap commented on a change in pull request #4927: [Transaction][Buffer] Provide an asynchronous method to create a transaction buffer URL: https://github.com/apache/pulsar/pull/4927#discussion_r312772433 ## File path: pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferProvider.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.pulsar.transaction.buffer.impl; + +import java.util.concurrent.CompletableFuture; +import lombok.Setter; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.transaction.buffer.TransactionBufferProvider; + +public class PersistentTransactionBufferProvider implements TransactionBufferProvider { + +private final BrokerService brokerService; +private final String topic; +private final String txnTopic; + +public PersistentTransactionBufferProvider(BrokerService service, String topic) { +this.brokerService = service; +// TODO: get the transaction topic name by the TopicName.getPersistentNamingEncoding(isTxn) +this.txnTopic = TopicName.get(topic).getPersistenceNamingEncoding() + "/_txnlog"; +this.topic = topic; +} + +@Override +public CompletableFuture newTransactionBuffer() { +CompletableFuture newBufferFuture = new CompletableFuture<>(); + brokerService.getManagedLedgerConfig(TopicName.get(topic)).whenComplete((config, err) -> { Review comment: The config can change before opening the ledger. We need to set the `createIfMissing` as true. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#issuecomment-520282807 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie merged pull request #4921: [4920][proxy] Add option to disable authentication for proxy /metrics
sijie merged pull request #4921: [4920][proxy] Add option to disable authentication for proxy /metrics URL: https://github.com/apache/pulsar/pull/4921 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie closed issue #4920: `/metrics` endpoint in proxy is behind authentication
sijie closed issue #4920: `/metrics` endpoint in proxy is behind authentication URL: https://github.com/apache/pulsar/issues/4920 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Add option to disable authentication for proxy /metrics (#4921)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new be7b24f Add option to disable authentication for proxy /metrics (#4921) be7b24f is described below commit be7b24f9f8aa67b2235e523485249aef8d2a611a Author: Addison Higham AuthorDate: Sun Aug 11 20:01:53 2019 -0600 Add option to disable authentication for proxy /metrics (#4921) This commit adds a new option optionally disable authentication for the `/metrics` endpoint in the pulsar-proxy. Currently, authentication is required for the metrics endpoint when authentication is enabled, which makes monitoring more difficult. However, rather than just disable it completely and allow for metrics to be exposed to any unknown user, this makes it opt in. It could be argued that it should default to false, but as it is likely that the proxy is the only component potentially exposed to the public internet, we default to not exposing data. Fixes #4920 --- .../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++ .../java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java| 3 ++- .../src/main/java/org/apache/pulsar/proxy/server/WebServer.java | 6 +- site2/docs/reference-configuration.md | 1 + 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index c0f7096..6a293a0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -197,6 +197,12 @@ public class ProxyConfiguration implements PulsarConfiguration { + "to take effect" ) private boolean forwardAuthorizationCredentials = false; +@FieldContext( +category = CATEGORY_AUTHENTICATION, +doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true." ++ "'authenticationEnabled' must also be set for this to take effect." +) +private boolean authenticateMetricsEndpoint = true; @FieldContext( diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 3e48c01..5a563e5 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -45,6 +45,7 @@ import org.apache.pulsar.common.configuration.VipStatus; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.Date; @@ -174,7 +175,7 @@ public class ProxyServiceStarter { static void addWebServerHandlers(WebServer server, ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { -server.addServlet("/metrics", new ServletHolder(MetricsServlet.class)); +server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); server.addRestResources("/", VipStatus.class.getPackage().getName(), VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 2c4a4c2..b4ebe35 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -127,6 +127,10 @@ public class WebServer { } public void addServlet(String basePath, ServletHolder servletHolder, List> attributes) { +addServlet(basePath, servletHolder, attributes, true); +} + +public void addServlet(String basePath, ServletHolder servletHolder, List> attributes, boolean requireAuthentication) { Optional existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst(); if (existingPath.isPresent()) { throw new IllegalArgumentException( @@ -140,7 +144,7 @@ public class WebServer { for (Pair attribute : attributes) { context.setAttribute(attribute.getLeft(), attribute.getRight()); } -if (config.isAuthenticationEnabled()) { +if (config.isAuthenticationEnabled() && requireAuthentication) { FilterHolder filter = new FilterHolder(new AuthenticationFilter(authenticationService));
[GitHub] [pulsar] sijie commented on issue #4910: [doc] Add a documentation page for metrics reference
sijie commented on issue #4910: [doc] Add a documentation page for metrics reference URL: https://github.com/apache/pulsar/pull/4910#issuecomment-520281659 run cpp tests run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: [pulsar-client] Fix broken replication msg to specific cluster (#4930)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 02f9fd3 [pulsar-client] Fix broken replication msg to specific cluster (#4930) 02f9fd3 is described below commit 02f9fd3e055f404501e41413ec1081331ff945c3 Author: Rajan Dhabalia AuthorDate: Sun Aug 11 18:41:19 2019 -0700 [pulsar-client] Fix broken replication msg to specific cluster (#4930) --- .../pulsar/broker/service/ReplicatorTest.java | 39 ++ .../apache/pulsar/common/protocol/Commands.java| 3 ++ 2 files changed, 42 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index eddb7cd..f68214a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase { } +@Test +public void testReplicatedCluster() throws Exception { + +log.info("--- Starting ReplicatorTest::testReplicatedCluster ---"); + +final String namespace = "pulsar/global/repl"; +final String topicName = String.format("persistent://%s/topic1", namespace); +admin1.namespaces().createNamespace(namespace); +admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); +admin1.topics().createPartitionedTopic(topicName, 4); + +PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) +.build(); +PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) +.build(); + +Producer producer1 = client1.newProducer().topic(topicName).create(); +org.apache.pulsar.client.api.Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); +org.apache.pulsar.client.api.Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); +byte[] value = "test".getBytes(); + +// publish message local only +TypedMessageBuilder msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value); +msg.send(); +assertEquals(consumer1.receive().getValue(), value); + +Message msg2 = consumer2.receive(1, TimeUnit.SECONDS); +if (msg2 != null) { +fail("msg should have not been replicated to remote cluster"); +} + +consumer1.close(); +consumer2.close(); +producer1.close(); + +} + private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7d9de6f..70ac8e4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1420,6 +1420,9 @@ public class Commands { if (builder.hasReplicatedFrom()) { messageMetadata.setReplicatedFrom(builder.getReplicatedFrom()); } +if (builder.getReplicateToCount() > 0) { +messageMetadata.addAllReplicateTo(builder.getReplicateToList()); +} if (builder.hasSchemaVersion()) { messageMetadata.setSchemaVersion(builder.getSchemaVersion()); }
[GitHub] [pulsar] sijie merged pull request #4930: [pulsar-client] Fix broken replication msg to specific cluster
sijie merged pull request #4930: [pulsar-client] Fix broken replication msg to specific cluster URL: https://github.com/apache/pulsar/pull/4930 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#issuecomment-520232917 @sijie Thanks for the review, i have addressed your comments, please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740632 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ## @@ -80,6 +91,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; this.interceptors = interceptors; +this.batchReceivePolicy = conf.getBatchReceivePolicy(); +this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740422 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java ## @@ -309,6 +307,36 @@ private void messageReceived(ConsumerImpl consumer, Message message) { } } +void notifyPendingBatchReceivedCallBack() { +OpBatchReceive opBatchReceive = pendingBatchReceives.poll(); +if (opBatchReceive == null || opBatchReceive.future == null) { +return; +} +notifyPendingBatchReceivedCallBack(opBatchReceive); +} + +void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { +MessagesImpl messages = new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(), +batchReceivePolicy.getMaxNumBytes()); +Message msgPeeked = incomingMessages.peek(); Review comment: I have added a method getReUseableMessagesImpl() to avoid create lots of MessagesImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740338 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -369,6 +370,52 @@ public UnAckedMessageTracker getUnAckedMessageTracker() { } } +@Override +protected Messages internalBatchReceive() throws PulsarClientException { +try { +return internalBatchReceiveAsync().get(); +} catch (InterruptedException | ExecutionException e) { +State state = getState(); +if (state != State.Closing && state != State.Closed) { +stats.incrementNumReceiveFailed(); Review comment: Yes, i will add metrics for batch receive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740347 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java ## @@ -309,6 +307,36 @@ private void messageReceived(ConsumerImpl consumer, Message message) { } } +void notifyPendingBatchReceivedCallBack() { +OpBatchReceive opBatchReceive = pendingBatchReceives.poll(); +if (opBatchReceive == null || opBatchReceive.future == null) { +return; +} +notifyPendingBatchReceivedCallBack(opBatchReceive); +} + +void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { +MessagesImpl messages = new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(), +batchReceivePolicy.getMaxNumBytes()); +Message msgPeeked = incomingMessages.peek(); +while (msgPeeked != null && messages.canAdd(msgPeeked)) { +Message msg = null; +try { +msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS); +} catch (InterruptedException e) { +// ignore Review comment: yes fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740282 ## File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration for message batch receive {@link Consumer#batchReceive()} {@link Consumer#batchReceiveAsync()}. + * + * Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a timeout + * for waiting for enough messages for this batch. + * + * This batch receive will be completed as long as any one of the + * conditions(has enough number of messages, has enough of size of messages, wait timeout) is met. + * + * Examples: + * + * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it + * means {@link Consumer#batchReceive()} will always wait until there is enough messages. + * + * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it + * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not there is enough messages. + * + * Note: + * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout. + * Otherwise, {@link Messages} ingest {@link Message} will never end. + * + * @since 2.4.1 + */ +public class BatchReceivePolicy { + +/** + * Default batch receive policy + * + * Max number of messages: 100 + * Max number of bytes: 10MB + * Timeout: 100ms + */ +public static final BatchReceivePolicy DEFAULT_POLICY = new BatchReceivePolicy( +100, 1024 * 1024 * 10, 100, TimeUnit.MILLISECONDS); + +private BatchReceivePolicy(int maxNumMessages, long maxNumBytes, int timeout, TimeUnit timeoutUnit) { +this.maxNumMessages = maxNumMessages; +this.maxNumBytes = maxNumBytes; +this.timeout = timeout; +this.timeoutUnit = timeoutUnit; +} + +/** + * Max number of messages for a single batch receive, 0 or negative means no limit. + */ +private int maxNumMessages; Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740300 ## File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration for message batch receive {@link Consumer#batchReceive()} {@link Consumer#batchReceiveAsync()}. + * + * Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a timeout + * for waiting for enough messages for this batch. + * + * This batch receive will be completed as long as any one of the + * conditions(has enough number of messages, has enough of size of messages, wait timeout) is met. + * + * Examples: + * + * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it + * means {@link Consumer#batchReceive()} will always wait until there is enough messages. + * + * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it + * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not there is enough messages. + * + * Note: + * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout. + * Otherwise, {@link Messages} ingest {@link Message} will never end. + * + * @since 2.4.1 + */ +public class BatchReceivePolicy { + +/** + * Default batch receive policy + * + * Max number of messages: 100 + * Max number of bytes: 10MB + * Timeout: 100ms + */ +public static final BatchReceivePolicy DEFAULT_POLICY = new BatchReceivePolicy( +100, 1024 * 1024 * 10, 100, TimeUnit.MILLISECONDS); + +private BatchReceivePolicy(int maxNumMessages, long maxNumBytes, int timeout, TimeUnit timeoutUnit) { +this.maxNumMessages = maxNumMessages; +this.maxNumBytes = maxNumBytes; +this.timeout = timeout; +this.timeoutUnit = timeoutUnit; +} + +/** + * Max number of messages for a single batch receive, 0 or negative means no limit. + */ +private int maxNumMessages; + +/** + * Max bytes of messages for a single batch receive, 0 or negative means no limit. + */ +private long maxNumBytes; Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740293 ## File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Messages.java ## @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; + +/** + * A container that holds the list {@link Message} for a topic. + * @param + */ +public interface Messages extends Iterable> { + +/** + * Get the list {@link Message} + */ +List> getMessageList(); Review comment: remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740260 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ## @@ -148,25 +139,53 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat "Cannot use receive() when a listener has been set"); } -switch (getState()) { -case Ready: -case Connecting: -break; // Ok -case Closing: -case Closed: -throw new PulsarClientException.AlreadyClosedException("Consumer already closed"); -case Terminated: -throw new PulsarClientException.AlreadyClosedException("Topic was terminated"); -case Failed: -case Uninitialized: -throw new PulsarClientException.NotConnectedException(); +PulsarClientException exception = verifyConsumerState(); +if (exception != null) { +throw exception; } - return internalReceive(timeout, unit); } abstract protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException; +@Override +public Messages batchReceive() throws PulsarClientException { +if (listener != null) { Review comment: I have added a new method for verify batch receive, because batchReceive() need to handle InterruptedException and ExecutionException and then unwrap them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312740276 ## File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration for message batch receive {@link Consumer#batchReceive()} {@link Consumer#batchReceiveAsync()}. + * + * Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a timeout + * for waiting for enough messages for this batch. + * + * This batch receive will be completed as long as any one of the + * conditions(has enough number of messages, has enough of size of messages, wait timeout) is met. + * + * Examples: + * + * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it + * means {@link Consumer#batchReceive()} will always wait until there is enough messages. + * + * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it + * means {@link Consumer#batchReceive()} will waiting for 100ms whether or not there is enough messages. + * + * Note: + * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout. + * Otherwise, {@link Messages} ingest {@link Message} will never end. + * + * @since 2.4.1 + */ +public class BatchReceivePolicy { + +/** + * Default batch receive policy + * + * Max number of messages: 100 + * Max number of bytes: 10MB + * Timeout: 100ms + */ +public static final BatchReceivePolicy DEFAULT_POLICY = new BatchReceivePolicy( Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739935 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java ## @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.base.Preconditions; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class MessagesImpl implements Messages { Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739722 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ## @@ -367,4 +410,98 @@ protected void onAckTimeoutSend(Set messageIds) { interceptors. onAckTimeoutSend(this, messageIds); } } + +protected boolean canEnqueueMessage(Message message) { +// Default behavior, can be overridden in subclasses +return true; +} + +protected boolean enqueueMessageAndCheckBatchReceive(Message message) { +if (canEnqueueMessage(message)) { +incomingMessages.add(message); +INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.getData().length); +} +return hasEnoughMessagesForBatchReceive(); +} + +protected boolean hasEnoughMessagesForBatchReceive() { +if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) { +return false; +} +return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) +|| (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); +} + +private PulsarClientException verifyConsumerState() { Review comment: fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312739173 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java ## @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.base.Preconditions; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class MessagesImpl implements Messages { + +private List> messageList; + +private final int maxNumberOfMessages; +private final long maxSizeOfMessages; + +private int currentNumberOfMessages; +private long currentSizeOfMessages; + +protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { +this.maxNumberOfMessages = maxNumberOfMessages; +this.maxSizeOfMessages = maxSizeOfMessages; +messageList = maxNumberOfMessages > 0 ? new ArrayList<>(maxNumberOfMessages) : new ArrayList<>(); +} + +protected boolean canAdd(Message message) { +if (maxNumberOfMessages <= 0 && maxSizeOfMessages <= 0) { +return true; +} +return (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 <= maxNumberOfMessages) +|| (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length <= maxSizeOfMessages); +} + +protected void add(Message message) { +if (message == null) { +return; +} +Preconditions.checkArgument(canAdd(message), "No more space to add messages."); Review comment: We need canAdd to do a pre-check, because we need add intercepted messages to Messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312737760 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -369,6 +370,52 @@ public UnAckedMessageTracker getUnAckedMessageTracker() { } } +@Override +protected Messages internalBatchReceive() throws PulsarClientException { +try { +return internalBatchReceiveAsync().get(); +} catch (InterruptedException | ExecutionException e) { +State state = getState(); +if (state != State.Closing && state != State.Closed) { +stats.incrementNumReceiveFailed(); +throw PulsarClientException.unwrap(e); +} else { +return null; +} +} +} + +@Override +protected CompletableFuture> internalBatchReceiveAsync() { +CompletableFuture> result = new CompletableFuture<>(); +try { +lock.writeLock().lock(); Review comment: Use read lock may cause Messages return early(have not reached capacity yet), so use write lock here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client.
codelipenghui commented on a change in pull request #4621: [PIP-38] Support batch receive in java client. URL: https://github.com/apache/pulsar/pull/4621#discussion_r312737067 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java ## @@ -314,6 +315,14 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema schema) { return this; } +@Override +public ConsumerBuilder batchReceivePolicy(BatchReceivePolicy batchReceivePolicy) { +checkArgument(batchReceivePolicy != null, "batchReceivePolicy must not be null."); Review comment: I see all the validations are still in ConsumerBuilderImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services