[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211836331 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java ## @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message message) assertEquals(latch.getCount(), 1); consumer.close(); } + +@Test +public void testDeadLetterTopic() throws Exception { + +final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + +final int maxRedeliveryCount = 2; + +final int sendMessages = 100; + +Producer producer = pulsarClient.newProducer(Schema.BYTES) +.topic(topic) +.create(); + +for (int i = 0; i < sendMessages; i++) { +producer.send("Hello Pulsar!".getBytes()); +} + +producer.close(); + +Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) +.topic(topic) +.subscriptionName("my-subscription") +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(3, TimeUnit.SECONDS) +.maxRedeliveryCount(maxRedeliveryCount) +.receiverQueueSize(100) +.maxUnackedMessagesPerConsumer(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) +.subscribe(); + +Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") +.subscriptionName("my-subscription") +.subscribe(); + +int totalReceived = 0; +do { +consumer.receive(); +totalReceived++; +} while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + +int totalInDeadLetter = 0; +do { +Message message = deadLetterConsumer.receive(); +consumer.acknowledge(message); +totalInDeadLetter++; +} while (totalInDeadLetter < sendMessages); + Review comment: Could we assert some thing to do a verification? such as messages are all received and acked? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835573 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java ## @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message message) assertEquals(latch.getCount(), 1); consumer.close(); } + +@Test +public void testDeadLetterTopic() throws Exception { + +final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + +final int maxRedeliveryCount = 2; + +final int sendMessages = 100; + +Producer producer = pulsarClient.newProducer(Schema.BYTES) +.topic(topic) +.create(); + +for (int i = 0; i < sendMessages; i++) { +producer.send("Hello Pulsar!".getBytes()); Review comment: How about combine "i" together with the message content? it may be useful a little to debug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835151 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java ## @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.utils; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class Quorum { Review comment: It maybe better to add some comments, such as how to use it, and the meaning of each public methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client
merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2423#issuecomment-414917527 @sijie This seems a dup of #2420, which one should get merged? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client
merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2423#issuecomment-414917313 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211831981 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +latch.countDown(); +} +}); +} catch (Throwable t) { +log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer, +entry.getPosition(), entry.getLedgerId(), t.getMessage(), t); +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +entry.release(); +latch.countDown(); +} +} +} +@Override +public void readEntryFailed(ManagedLedgerException exception, Object ctx) { +log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception); +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} +}, null); +} +try { Review comment: I'm already have a [commit](https://github.com/apache/incubator-pulsar/pull/2400/commits/dc31b2fcba17bf304f9a4cfdda7a5b7e0508b65b) for this, help me review it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211831981 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +latch.countDown(); +} +}); +} catch (Throwable t) { +log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer, +entry.getPosition(), entry.getLedgerId(), t.getMessage(), t); +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +entry.release(); +latch.countDown(); +} +} +} +@Override +public void readEntryFailed(ManagedLedgerException exception, Object ctx) { +log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception); +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} +}, null); +} +try { Review comment: I'm already have a commit for this [commit](https://github.com/apache/incubator-pulsar/pull/2400/commits/dc31b2fcba17bf304f9a4cfdda7a5b7e0508b65b), help me review it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...
[GitHub] srkukarni closed pull request #2422: Improve the documentation about publish function
srkukarni closed pull request #2422: Improve the documentation about publish function URL: https://github.com/apache/incubator-pulsar/pull/2422 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index fba82a7579..2856b7cab0 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -166,11 +166,11 @@ *The name of the topic for publishing * @param object *The object that needs to be published - * @param topicsPattern + * @param schemaOrSerdeClassName *Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class * @return A future that completes when the framework is done publishing the message */ - CompletableFuture publish(String topicName, O object, String schemaType); + CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName); /** * Publish an object to the topic using default schemas diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 18e1fe3514..7265e456cc 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -259,8 +259,8 @@ public ByteBuffer getState(String key) { @SuppressWarnings("unchecked") @Override -public CompletableFuture publish(String topicName, O object, String serDeClassName) { -return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, serDeClassName)); +public CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName) { +return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName)); } @SuppressWarnings("unchecked") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Improve the documentation about publish function (#2422)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6ddc2ad Improve the documentation about publish function (#2422) 6ddc2ad is described below commit 6ddc2add9402cdb2ec5b57646ca08812ca00bd51 Author: Sanjeev Kulkarni AuthorDate: Tue Aug 21 20:18:28 2018 -0700 Improve the documentation about publish function (#2422) --- .../src/main/java/org/apache/pulsar/functions/api/Context.java| 4 ++-- .../main/java/org/apache/pulsar/functions/instance/ContextImpl.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index fba82a7..2856b7c 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -166,11 +166,11 @@ public interface Context { *The name of the topic for publishing * @param object *The object that needs to be published - * @param topicsPattern + * @param schemaOrSerdeClassName *Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class * @return A future that completes when the framework is done publishing the message */ - CompletableFuture publish(String topicName, O object, String schemaType); + CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName); /** * Publish an object to the topic using default schemas diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 18e1fe3..7265e45 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -259,8 +259,8 @@ class ContextImpl implements Context, SinkContext, SourceContext { @SuppressWarnings("unchecked") @Override -public CompletableFuture publish(String topicName, O object, String serDeClassName) { -return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, serDeClassName)); +public CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName) { +return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName)); } @SuppressWarnings("unchecked")
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211807555 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +latch.countDown(); +} +}); +} catch (Throwable t) { +log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer, +entry.getPosition(), entry.getLedgerId(), t.getMessage(), t); +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +entry.release(); +latch.countDown(); +} +} +} +@Override +public void readEntryFailed(ManagedLedgerException exception, Object ctx) { +log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception); +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} +}, null); +} +try { Review comment: I previously thought that call readMoreEntries() should wait for callbacks complete. so the client must wait for callbacks complete. After discuss with Jia Zhai, we thought that readMoreEntries() should wait for callbacks complete but client do not need to wait callbacks complete. But another problem is when to call readMoreEntries(). on every callback? or pending the callbacks then readMoreEntries() only once. Another possible issue is w
[GitHub] rdhabalia commented on issue #2416: Fix: authorization while redirecting function admin call
rdhabalia commented on issue #2416: Fix: authorization while redirecting function admin call URL: https://github.com/apache/incubator-pulsar/pull/2416#issuecomment-414884730 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211807555 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +latch.countDown(); +} +}); +} catch (Throwable t) { +log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer, +entry.getPosition(), entry.getLedgerId(), t.getMessage(), t); +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +entry.release(); +latch.countDown(); +} +} +} +@Override +public void readEntryFailed(ManagedLedgerException exception, Object ctx) { +log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception); +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} +}, null); +} +try { Review comment: Oh, I thought that if the last redeliverUnacknowledgedMessages is return but callbacks not complete yet, then another redeliverUnacknowledgedMessages is coming, if last messagesToReplay is not delete yet, so that current messagesToReplay has the last messagesToReplay items. And, I thought that call readMoreEntries() should wait for callbacks complete. This is an automated message fr
[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function
srkukarni commented on issue #2422: Improve the documentation about publish function URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414859049 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function
srkukarni commented on issue #2422: Improve the documentation about publish function URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414859021 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function
srkukarni commented on issue #2422: Improve the documentation about publish function URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414858935 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client
sijie commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2423#discussion_r211793997 ## File path: pulsar-client-cpp/docker-build.sh ## @@ -41,4 +41,4 @@ DOCKER_CMD="docker run -i -v $ROOT_DIR:/pulsar $IMAGE" find . -name CMakeCache.txt | xargs rm -f find . -name CMakeFiles | xargs rm -rf -$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && make check-format && make" +$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && make format && make check-format && make" Review comment: that was merged by mistake. revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2421: Fix bug during user publish
rdhabalia commented on a change in pull request #2421: Fix bug during user publish URL: https://github.com/apache/incubator-pulsar/pull/2421#discussion_r211787361 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java ## @@ -269,9 +269,11 @@ public ByteBuffer getState(String key) { if (producer == null) { try { -Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()).schema(schema).create(); +Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) Review comment: can we please add unit-test here which covers this path. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client
merlimat commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2423#discussion_r211785148 ## File path: pulsar-client-cpp/docker-build.sh ## @@ -41,4 +41,4 @@ DOCKER_CMD="docker run -i -v $ROOT_DIR:/pulsar $IMAGE" find . -name CMakeCache.txt | xargs rm -f find . -name CMakeFiles | xargs rm -rf -$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && make check-format && make" +$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && make format && make check-format && make" Review comment: `make format` should not be performed always (though it would be nice to expose it as a script that runs inside docker to do it) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: remove 'to delete' (#2418)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3bd1ad9 remove 'to delete' (#2418) 3bd1ad9 is described below commit 3bd1ad9b999c3a9b307f856f9022049ca2166bad Author: Yuto Furuta AuthorDate: Wed Aug 22 07:44:14 2018 +0900 remove 'to delete' (#2418) ### Motivation https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#get-1 https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#getstatus https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#list-2 All `to delete` in description of options of the above URLs are unnecessary. ### Modifications Remove `to delete`. --- site2/docs/reference-pulsar-admin.md | 16 .../version-2.1.0-incubating/reference-pulsar-admin.md | 16 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 51e5713..48fec1a 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -416,9 +416,9 @@ $ pulsar-admin functions get options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `getstatus` @@ -432,9 +432,9 @@ $ pulsar-admin functions getstatus options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `list` @@ -448,8 +448,8 @@ $ pulsar-admin functions list options Options |Flag|Description|Default| |---|---|---| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `querystate` diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md index 43d3ef6..e1d2260 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md @@ -417,9 +417,9 @@ $ pulsar-admin functions get options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `getstatus` @@ -433,9 +433,9 @@ $ pulsar-admin functions getstatus options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `list` @@ -449,8 +449,8 @@ $ pulsar-admin functions list options Options |Flag|Description|Default| |---|---|---| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `querystate`
[GitHub] sijie closed pull request #2418: Fix Description of Options on Pulsar Admin
sijie closed pull request #2418: Fix Description of Options on Pulsar Admin URL: https://github.com/apache/incubator-pulsar/pull/2418 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 51e5713fff..48fec1a024 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -416,9 +416,9 @@ $ pulsar-admin functions get options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `getstatus` @@ -432,9 +432,9 @@ $ pulsar-admin functions getstatus options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `list` @@ -448,8 +448,8 @@ $ pulsar-admin functions list options Options |Flag|Description|Default| |---|---|---| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `querystate` diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md index 43d3ef61fd..e1d2260bf6 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md @@ -417,9 +417,9 @@ $ pulsar-admin functions get options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `getstatus` @@ -433,9 +433,9 @@ $ pulsar-admin functions getstatus options Options |Flag|Description|Default| |---|---|---| -|`--name`|The name of the function to delete|| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--name`|The name of the function|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `list` @@ -449,8 +449,8 @@ $ pulsar-admin functions list options Options |Flag|Description|Default| |---|---|---| -|`--namespace`|The namespace of the function to delete|| -|`--tenant`|The tenant of the function to delete|| +|`--namespace`|The namespace of the function|| +|`--tenant`|The tenant of the function|| ### `querystate` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client
sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211781198 ## File path: pulsar-client-cpp/python/pulsar/__init__.py ## @@ -321,7 +321,8 @@ def create_producer(self, topic, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, - message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution + message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, +properties=None, Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2423: [client] add properties to consumer for cpp & python client
sijie opened a new pull request #2423: [client] add properties to consumer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2423 ### Motivation This is a caught-up change to enable properties for consumer as java clients. ### Changes Enable properties on consumer for both cpp & python client ### Results Properties are added as metadata for CommandSubscribe. However there is no way to verify the consumer properties. so I didn't add any specific tests, just adding properties for both cpp and python clients in the tests, that should excerise the corresponding code path. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context
srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211776412 ## File path: pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java ## @@ -86,7 +86,14 @@ * * @return the instance id */ -String getInstanceId(); +int getInstanceId(); + +/** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ +int getNumInstances(); Review comment: Just add a comment stating that this might not be accurate during localrun This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2422: Improve the documentation about publish function
srkukarni opened a new pull request #2422: Improve the documentation about publish function URL: https://github.com/apache/incubator-pulsar/pull/2422 ### Motivation Cleaned up the comments around publish ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer
sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/2346#issuecomment-414835396 @jiazhai can you rebase this PR to master? @merlimat can you review it again? we need this for 2.1.1 release This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client
sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211773131 ## File path: pulsar-client-cpp/python/pulsar/__init__.py ## @@ -321,7 +321,8 @@ def create_producer(self, topic, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, - message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution + message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, +properties=None, Review comment: yeah will add This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context
sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#issuecomment-414834875 > What about python changes? okay will take a look at python This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context
sijie commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211772934 ## File path: pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java ## @@ -86,7 +86,14 @@ * * @return the instance id */ -String getInstanceId(); +int getInstanceId(); + +/** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ +int getNumInstances(); Review comment: how about adding a comment there, saying `getNumInstances` will return -1 if the function is invoked from localrun? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2421: Fix bug during user publish
srkukarni opened a new pull request #2421: Fix bug during user publish URL: https://github.com/apache/incubator-pulsar/pull/2421 ### Motivation This pr fixes two bugs in the path of user publish 1. Stackoverflow error in ContextImpl where getSchema was calling itself 2. Add topicName while building the producer. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2420: [client] add properties to producer for cpp & python client
merlimat commented on a change in pull request #2420: [client] add properties to producer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211769327 ## File path: pulsar-client-cpp/python/pulsar/__init__.py ## @@ -321,7 +321,8 @@ def create_producer(self, topic, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, - message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution + message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, +properties=None, Review comment: We should add `properties` to the pydoc string below This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2420: [client] add properties to producer for cpp & python client
sijie opened a new pull request #2420: [client] add properties to producer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2420 ### Motivation This is a caught-up change to enable properties for producer as java clients. ### Changes Enable properties on producer for both cpp & python client ### Results Properties are added as metadata for CommandProducer. However there is no way to verify the producer properties. so I didn't add any specific tests, just adding properties for both cpp and python clients in the tests, that should excerise the corresponding code path. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2418: Fix Description of Options on Pulsar Admin
sijie commented on issue #2418: Fix Description of Options on Pulsar Admin URL: https://github.com/apache/incubator-pulsar/pull/2418#issuecomment-414827155 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context
srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211750888 ## File path: pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java ## @@ -86,7 +86,14 @@ * * @return the instance id */ -String getInstanceId(); +int getInstanceId(); + +/** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ +int getNumInstances(); Review comment: getNumInstances won't work in the context of localrun. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed issue #2282: Document the settings to enable authorization
sijie closed issue #2282: Document the settings to enable authorization URL: https://github.com/apache/incubator-pulsar/issues/2282 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2282: Document the settings to enable authorization
sijie commented on issue #2282: Document the settings to enable authorization URL: https://github.com/apache/incubator-pulsar/issues/2282#issuecomment-414793036 yeah the documentation is great! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2379: Initializing cluster metadata should handle /managed-ledger path better
sijie commented on issue #2379: Initializing cluster metadata should handle /managed-ledger path better URL: https://github.com/apache/incubator-pulsar/issues/2379#issuecomment-414792602 @ivankelly I think we should two changes here: 1) disable initializing /managed-ledgers in managed ledgers. 2) handle /managed-ledgers exists situtation on initializing metadata. > Why is managed-ledgers an exception? I guess `managed-ledgers` code exists before other components in pulsar This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic
sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211723819 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); Review comment: so the `redeliveryTracker.remove` should be called in `deleteCallback`, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic
sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211724307 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java ## @@ -556,13 +618,90 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { -positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } +if (maxRedeliveryCount > 0 && redeliveryTracker != null) { +for (PositionImpl position : positions) { +if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) { +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +} else { +messagesToDeadLetter.add(position); +} +} +if (messagesToDeadLetter.size() > 0) { +CountDownLatch latch = new CountDownLatch(messagesToDeadLetter.size()); +for (PositionImpl position : messagesToDeadLetter) { +cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { +@Override +public void readEntryComplete(Entry entry, Object ctx) { +if (entry == null) { +log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position); +latch.countDown(); +} else { +try { +ByteBuf headersAndPayload = entry.getDataBuffer(); +MessageImpl msg = MessageImpl.deserialize(headersAndPayload); +headersAndPayload.retain(); +msg.setReplicatedFrom("DLQ"); +CompletableFuture future = deadLetterTopicProducer.sendAsync(msg); +future.whenCompleteAsync((messageId, error) -> { +if (error != null) { +log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}", +name, consumer, deadLetterTopic, error.getMessage(), error); + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} else { +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +latch.countDown(); +} +}); +} catch (Throwable t) { +log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer, +entry.getPosition(), entry.getLedgerId(), t.getMessage(), t); +cursor.asyncDelete(position, deleteCallback, position); +redeliveryTracker.remove(position); +entry.release(); +latch.countDown(); +} +} +} +@Override +public void readEntryFailed(ManagedLedgerException exception, Object ctx) { +log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception); +messagesToReplay.add(position.getLedgerId(), position.getEntryId()); +latch.countDown(); +} +}, null); +} +try { Review comment: No sure I understand the problem here. Can you clarify it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2419: [documentation] cherry-pick deploy-kubernetes documentation changes
sijie opened a new pull request #2419: [documentation] cherry-pick deploy-kubernetes documentation changes URL: https://github.com/apache/incubator-pulsar/pull/2419 Cherry-pick changes from #2363 to 2.1.0 documentation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar.wiki] branch master updated: Updated PIP 23: Message Tracing By Interceptors (markdown)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new ed4b0e9 Updated PIP 23: Message Tracing By Interceptors (markdown) ed4b0e9 is described below commit ed4b0e9a07bef40cb6aa85f9d430cf16cdfb175c Author: Sijie Guo AuthorDate: Tue Aug 21 11:13:08 2018 -0700 Updated PIP 23: Message Tracing By Interceptors (markdown) --- PIP-23:-Message-Tracing-By-Interceptors.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md b/PIP-23:-Message-Tracing-By-Interceptors.md index d57d558..58d5336 100644 --- a/PIP-23:-Message-Tracing-By-Interceptors.md +++ b/PIP-23:-Message-Tracing-By-Interceptors.md @@ -1,5 +1,5 @@ - **Status**: Proposed -- **Author**: [Penghui Li](https://github.com/codelipenghui) +- **Author**: [Penghui Li](https://github.com/codelipenghui), [Jia Zhai](https://github.com/zhaijack) - **Pull Request**: - - **Mailing List discussion**:
[GitHub] merlimat commented on issue #2418: Fix Description of Options on Pulsar Admin
merlimat commented on issue #2418: Fix Description of Options on Pulsar Admin URL: https://github.com/apache/incubator-pulsar/pull/2418#issuecomment-414754274 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2414: allow predicates concerning publish time to push down to pulsar
jerrypeng commented on issue #2414: allow predicates concerning publish time to push down to pulsar URL: https://github.com/apache/incubator-pulsar/pull/2414#issuecomment-414744387 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2401: Consumer often discards received messages
massakam commented on issue #2401: Consumer often discards received messages URL: https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-414642945 This phenomenon seems to happen when the consumer is receiving a lot of messages. For example, it is prone to happen when the consumer connects to a topic with a large number of messages in the backlog. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar.wiki] branch master updated: Created PIP 23: Message Tracing By Interceptors (markdown)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new e0388dd Created PIP 23: Message Tracing By Interceptors (markdown) e0388dd is described below commit e0388dd8484f562109c6a4e689983883d7c1beba Author: Sijie Guo AuthorDate: Tue Aug 21 03:47:09 2018 -0700 Created PIP 23: Message Tracing By Interceptors (markdown) --- PIP-23:-Message-Tracing-By-Interceptors.md | 239 + 1 file changed, 239 insertions(+) diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md b/PIP-23:-Message-Tracing-By-Interceptors.md new file mode 100644 index 000..d57d558 --- /dev/null +++ b/PIP-23:-Message-Tracing-By-Interceptors.md @@ -0,0 +1,239 @@ +- **Status**: Proposed +- **Author**: [Penghui Li](https://github.com/codelipenghui) +- **Pull Request**: - +- **Mailing List discussion**: + +## Requirement & Overview + +This is related to issue #2280. +And for Pulsar itself, That will be useful also to track performance issues and understand more of the latency breakdowns. Tracing is the easiest way to do: "show me a request with high-latency and show where the latency (for this particular request) was coming from. + +## GOALS + +A good approach to that, is to integrate a tracing standard, either OpenTracing or OpenCensus, at both bookkeeper and pulsar. so their applications can integrate with OpenTracing or OpenCensus as well, then application's traceId, spanId can be passed all way from producer to broker to bookie and to consumer. Then those traces can be collected and fed into an ElasticSearch cluster for query. + +## IMPLEMENTATION + +Integration with a tracing system typically requires tight integration with applications. Especially if Pulsar is used in a data pipeline, Pulsar should be able to provide mechanism to be able to propagate trace context end-to-end. That says Pulsar doesn’t have to integrate with any tracing framework directly, but it has to provide mechanism for applications to do so. This can be done by: + +- Use `properties` in message header for propagating trace context. +- Provide an interface to intercept message pipeline in Pulsar, which allows applications customize their own tracing behaviors and meet their requirements. + +This PIP introduces **Interceptor** to examine (and potentially modify) messages at key places during the lifecycle of a Pulsar message. The interceptors include `ProducerInterceptor` for intercepting messages at Producer side, and `ConsumerInterceptor` for intercepting messages at Consumer side. + +### Producer Interceptor + +The ProducerInterceptor intercept messages before sending them and on receiving acknowledgement from brokers. The interfaces for a ProducerInterceptor are defined as below: + +```java +package org.apache.pulsar.client.api; + +/** + * A plugin interface that allows you to intercept (and possibly mutate) the + * messages received by the producer before they are published to the Pulsar + * brokers. + * + * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but + * not propagated further. + * + * ProducerInterceptor callbacks may be called from multiple threads. Interceptor + * implementation must ensure thread-safety, if needed. + */ +public interface ProducerInterceptor extends AutoCloseable { + +/** + * Close the interceptor. + */ +void close(); + +/** + * This is called from {@link Producer#send(Object)} and {@link + * Producer#sendAsync(Object)} methods, before + * send the message to the brokers. This method is allowed to modify the + * record, in which case, the new record + * will be returned. + * + * Any exception thrown by this method will be caught by the caller and + * logged, but not propagated further. + * + * Since the producer may run multiple interceptors, a particular + * interceptor's {@link #beforeSend(Message)} callback will be called in the + * order specified by + * {@link ProducerBuilder#intercept(ProducerInterceptor[])}. + * + * The first interceptor in the list gets the message passed from the client, + * the following interceptor will be passed the message returned by the + * previous interceptor, and so on. Since interceptors are allowed to modify + * messages, interceptors may potentially get the message already modified by + * other interceptors. However, building a pipeline of mutable interceptors + * that depend on the output of the previous interceptor is discouraged, + * because of potential side-effects caused by interceptors potentially + * failing to modify the message and throwing an exception. If one of the + * interceptors in the list throws an exception from + * {@link#beforeSen
[incubator-pulsar.wiki] branch master updated: Updated Home (markdown)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 7d081a9 Updated Home (markdown) 7d081a9 is described below commit 7d081a92f165cb1e0f3067f4709b80cd5da23d28 Author: Sijie Guo AuthorDate: Tue Aug 21 03:39:08 2018 -0700 Updated Home (markdown) --- Home.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Home.md b/Home.md index cf0aa19..74324a2 100644 --- a/Home.md +++ b/Home.md @@ -5,10 +5,11 @@ We encourage to document any big change or feature or any addition to public user APIs through a design document to be discussed with the community. -*Next Proposal Number: 23* +*Next Proposal Number: 24* ### Proposed +* [[PIP 23: Message Tracing By Interceptors]] * [[PIP 22: Pulsar Dead Letter Topic]] * [[PIP 21: Pulsar Edge Component]]
[GitHub] k2la opened a new pull request #2418: Fix Description of Options on Pulsar Admin
k2la opened a new pull request #2418: Fix Description of Options on Pulsar Admin URL: https://github.com/apache/incubator-pulsar/pull/2418 ### Motivation https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#get-1 https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#getstatus https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#list-2 All `to delete` in description of options of the above URLs are unnecessary. ### Modifications Remove `to delete`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2397: Integrate Functions with Schema
srkukarni commented on a change in pull request #2397: Integrate Functions with Schema URL: https://github.com/apache/incubator-pulsar/pull/2397#discussion_r211515618 ## File path: pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java ## @@ -161,15 +161,19 @@ /** * Publish an object using serDe for serializing to the topic - * @param topicName The name of the topic for publishing - * @param object The object that needs to be published - * @param serDeClassName The class name of the class that needs to be used to serialize the object before publishing + * + * @param topicName + *The name of the topic for publishing + * @param object + *The object that needs to be published + * @param topicsPattern + *Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class * @return A future that completes when the framework is done publishing the message */ - CompletableFuture publish(String topicName, O object, String serDeClassName); + CompletableFuture publish(String topicName, O object, String schemaType); Review comment: Serde's are not being deprecated. The only thing that is being deprecated is DefaultSerde. Thus I believe we should add another interface supporting schema rather than delete this This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2397: Integrate Functions with Schema
srkukarni commented on a change in pull request #2397: Integrate Functions with Schema URL: https://github.com/apache/incubator-pulsar/pull/2397#discussion_r211515762 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java ## @@ -469,20 +477,56 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep // set source spec // source spec classname should be empty so that the default pulsar source will be used SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); Review comment: serde's are not deprecated. Only DefaultSerde is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services