[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211836331
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##
 @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message 
message)
 assertEquals(latch.getCount(), 1);
 consumer.close();
 }
+
+@Test
+public void testDeadLetterTopic() throws Exception {
+
+final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+final int maxRedeliveryCount = 2;
+
+final int sendMessages = 100;
+
+Producer producer = pulsarClient.newProducer(Schema.BYTES)
+.topic(topic)
+.create();
+
+for (int i = 0; i < sendMessages; i++) {
+producer.send("Hello Pulsar!".getBytes());
+}
+
+producer.close();
+
+Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+.topic(topic)
+.subscriptionName("my-subscription")
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(3, TimeUnit.SECONDS)
+.maxRedeliveryCount(maxRedeliveryCount)
+.receiverQueueSize(100)
+.maxUnackedMessagesPerConsumer(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+.subscribe();
+
+Consumer deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+.subscriptionName("my-subscription")
+.subscribe();
+
+int totalReceived = 0;
+do {
+consumer.receive();
+totalReceived++;
+} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+int totalInDeadLetter = 0;
+do {
+Message message = deadLetterConsumer.receive();
+consumer.acknowledge(message);
+totalInDeadLetter++;
+} while (totalInDeadLetter < sendMessages);
+
 
 Review comment:
   Could we assert some thing to do a verification? such as messages are all 
received and acked?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835573
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##
 @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message 
message)
 assertEquals(latch.getCount(), 1);
 consumer.close();
 }
+
+@Test
+public void testDeadLetterTopic() throws Exception {
+
+final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+final int maxRedeliveryCount = 2;
+
+final int sendMessages = 100;
+
+Producer producer = pulsarClient.newProducer(Schema.BYTES)
+.topic(topic)
+.create();
+
+for (int i = 0; i < sendMessages; i++) {
+producer.send("Hello Pulsar!".getBytes());
 
 Review comment:
   How about combine "i" together with the message content? it may be useful a 
little to debug. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835151
 
 

 ##
 File path: pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java
 ##
 @@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class Quorum {
 
 Review comment:
   It maybe better to add some comments, such as how to use it, and the meaning 
of each public methods. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client

2018-08-21 Thread GitBox
merlimat commented on issue #2423: [client] add properties to consumer for cpp 
& python client
URL: https://github.com/apache/incubator-pulsar/pull/2423#issuecomment-414917527
 
 
   @sijie This seems a dup of #2420, which one should get merged?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2423: [client] add properties to consumer for cpp & python client

2018-08-21 Thread GitBox
merlimat commented on issue #2423: [client] add properties to consumer for cpp 
& python client
URL: https://github.com/apache/incubator-pulsar/pull/2423#issuecomment-414917313
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211831981
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+latch.countDown();
+}
+});
+} catch (Throwable t) {
+log.error("[{}-{}] Failed to deserialize 
message at {} {} {} {}", name, consumer,
+entry.getPosition(), 
entry.getLedgerId(), t.getMessage(), t);
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+entry.release();
+latch.countDown();
+}
+}
+}
+@Override
+public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+log.error("[{}-{}] Read entries failed {} {}", 
name, consumer, exception.getMessage(), exception);
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+latch.countDown();
+}
+}, null);
+}
+try {
 
 Review comment:
   I'm already have a  
[commit](https://github.com/apache/incubator-pulsar/pull/2400/commits/dc31b2fcba17bf304f9a4cfdda7a5b7e0508b65b)
 for this, help me review it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra

[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211831981
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+latch.countDown();
+}
+});
+} catch (Throwable t) {
+log.error("[{}-{}] Failed to deserialize 
message at {} {} {} {}", name, consumer,
+entry.getPosition(), 
entry.getLedgerId(), t.getMessage(), t);
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+entry.release();
+latch.countDown();
+}
+}
+}
+@Override
+public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+log.error("[{}-{}] Read entries failed {} {}", 
name, consumer, exception.getMessage(), exception);
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+latch.countDown();
+}
+}, null);
+}
+try {
 
 Review comment:
   I'm already have a commit for this 
[commit](https://github.com/apache/incubator-pulsar/pull/2400/commits/dc31b2fcba17bf304f9a4cfdda7a5b7e0508b65b),
 help me review it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...

[GitHub] srkukarni closed pull request #2422: Improve the documentation about publish function

2018-08-21 Thread GitBox
srkukarni closed pull request #2422: Improve the documentation about publish 
function
URL: https://github.com/apache/incubator-pulsar/pull/2422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index fba82a7579..2856b7cab0 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -166,11 +166,11 @@
  *The name of the topic for publishing
  * @param object
  *The object that needs to be published
- * @param topicsPattern
+ * @param schemaOrSerdeClassName
  *Either a builtin schema type (eg: "avro", "json", 
"protobuf") or the class name of the custom schema class
  * @return A future that completes when the framework is done publishing 
the message
  */
- CompletableFuture publish(String topicName, O object, String 
schemaType);
+ CompletableFuture publish(String topicName, O object, String 
schemaOrSerdeClassName);
 
 /**
  * Publish an object to the topic using default schemas
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 18e1fe3514..7265e456cc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -259,8 +259,8 @@ public ByteBuffer getState(String key) {
 
 @SuppressWarnings("unchecked")
 @Override
-public  CompletableFuture publish(String topicName, O object, 
String serDeClassName) {
-return publish(topicName, object, (Schema) 
topicSchema.getSchema(topicName, object, serDeClassName));
+public  CompletableFuture publish(String topicName, O object, 
String schemaOrSerdeClassName) {
+return publish(topicName, object, (Schema) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
 }
 
 @SuppressWarnings("unchecked")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Improve the documentation about publish function (#2422)

2018-08-21 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ddc2ad  Improve the documentation about publish function (#2422)
6ddc2ad is described below

commit 6ddc2add9402cdb2ec5b57646ca08812ca00bd51
Author: Sanjeev Kulkarni 
AuthorDate: Tue Aug 21 20:18:28 2018 -0700

Improve the documentation about publish function (#2422)
---
 .../src/main/java/org/apache/pulsar/functions/api/Context.java| 4 ++--
 .../main/java/org/apache/pulsar/functions/instance/ContextImpl.java   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index fba82a7..2856b7c 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -166,11 +166,11 @@ public interface Context {
  *The name of the topic for publishing
  * @param object
  *The object that needs to be published
- * @param topicsPattern
+ * @param schemaOrSerdeClassName
  *Either a builtin schema type (eg: "avro", "json", 
"protobuf") or the class name of the custom schema class
  * @return A future that completes when the framework is done publishing 
the message
  */
- CompletableFuture publish(String topicName, O object, String 
schemaType);
+ CompletableFuture publish(String topicName, O object, String 
schemaOrSerdeClassName);
 
 /**
  * Publish an object to the topic using default schemas
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 18e1fe3..7265e45 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -259,8 +259,8 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
 @SuppressWarnings("unchecked")
 @Override
-public  CompletableFuture publish(String topicName, O object, 
String serDeClassName) {
-return publish(topicName, object, (Schema) 
topicSchema.getSchema(topicName, object, serDeClassName));
+public  CompletableFuture publish(String topicName, O object, 
String schemaOrSerdeClassName) {
+return publish(topicName, object, (Schema) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
 }
 
 @SuppressWarnings("unchecked")



[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211807555
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+latch.countDown();
+}
+});
+} catch (Throwable t) {
+log.error("[{}-{}] Failed to deserialize 
message at {} {} {} {}", name, consumer,
+entry.getPosition(), 
entry.getLedgerId(), t.getMessage(), t);
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+entry.release();
+latch.countDown();
+}
+}
+}
+@Override
+public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+log.error("[{}-{}] Read entries failed {} {}", 
name, consumer, exception.getMessage(), exception);
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+latch.countDown();
+}
+}, null);
+}
+try {
 
 Review comment:
   I previously thought that call readMoreEntries() should wait for callbacks 
complete. so the client must wait for callbacks complete.
   
   After discuss with Jia Zhai,  we thought that readMoreEntries() should wait 
for callbacks complete but client do not need to wait callbacks complete. 
   
   But another problem is when to call readMoreEntries(). on every callback? or 
pending the callbacks then readMoreEntries() only once.
   
   Another possible issue is w

[GitHub] rdhabalia commented on issue #2416: Fix: authorization while redirecting function admin call

2018-08-21 Thread GitBox
rdhabalia commented on issue #2416: Fix: authorization while redirecting 
function admin call
URL: https://github.com/apache/incubator-pulsar/pull/2416#issuecomment-414884730
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211807555
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+latch.countDown();
+}
+});
+} catch (Throwable t) {
+log.error("[{}-{}] Failed to deserialize 
message at {} {} {} {}", name, consumer,
+entry.getPosition(), 
entry.getLedgerId(), t.getMessage(), t);
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+entry.release();
+latch.countDown();
+}
+}
+}
+@Override
+public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+log.error("[{}-{}] Read entries failed {} {}", 
name, consumer, exception.getMessage(), exception);
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+latch.countDown();
+}
+}, null);
+}
+try {
 
 Review comment:
   Oh, I thought that if the last redeliverUnacknowledgedMessages is return but 
callbacks not complete yet, then another redeliverUnacknowledgedMessages is 
coming,  if last messagesToReplay is not delete yet, so that current 
messagesToReplay has the last messagesToReplay items.
   And, I thought that call readMoreEntries() should wait for callbacks 
complete. 
   
   


This is an automated message fr

[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function

2018-08-21 Thread GitBox
srkukarni commented on issue #2422: Improve the documentation about publish 
function
URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414859049
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function

2018-08-21 Thread GitBox
srkukarni commented on issue #2422: Improve the documentation about publish 
function
URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414859021
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2422: Improve the documentation about publish function

2018-08-21 Thread GitBox
srkukarni commented on issue #2422: Improve the documentation about publish 
function
URL: https://github.com/apache/incubator-pulsar/pull/2422#issuecomment-414858935
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2423: [client] add properties to 
consumer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2423#discussion_r211793997
 
 

 ##
 File path: pulsar-client-cpp/docker-build.sh
 ##
 @@ -41,4 +41,4 @@ DOCKER_CMD="docker run -i -v $ROOT_DIR:/pulsar $IMAGE"
 find . -name CMakeCache.txt | xargs rm -f
 find . -name CMakeFiles | xargs rm -rf
 
-$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && 
make check-format && make"
+$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && 
make format && make check-format && make"
 
 Review comment:
   that was merged by mistake. revert it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2421: Fix bug during user publish

2018-08-21 Thread GitBox
rdhabalia commented on a change in pull request #2421: Fix bug during user 
publish
URL: https://github.com/apache/incubator-pulsar/pull/2421#discussion_r211787361
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 ##
 @@ -269,9 +269,11 @@ public ByteBuffer getState(String key) {
 
 if (producer == null) {
 try {
-Producer newProducer = ((ProducerBuilderImpl) 
producerBuilder.clone()).schema(schema).create();
+Producer newProducer = ((ProducerBuilderImpl) 
producerBuilder.clone())
 
 Review comment:
   can we please add unit-test here which covers this path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2423: [client] add properties to consumer for cpp & python client

2018-08-21 Thread GitBox
merlimat commented on a change in pull request #2423: [client] add properties 
to consumer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2423#discussion_r211785148
 
 

 ##
 File path: pulsar-client-cpp/docker-build.sh
 ##
 @@ -41,4 +41,4 @@ DOCKER_CMD="docker run -i -v $ROOT_DIR:/pulsar $IMAGE"
 find . -name CMakeCache.txt | xargs rm -f
 find . -name CMakeFiles | xargs rm -rf
 
-$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && 
make check-format && make"
+$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && 
make format && make check-format && make"
 
 Review comment:
   `make format` should not be performed always (though it would be nice to 
expose it as a script that runs inside docker to do it) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: remove 'to delete' (#2418)

2018-08-21 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3bd1ad9  remove 'to delete' (#2418)
3bd1ad9 is described below

commit 3bd1ad9b999c3a9b307f856f9022049ca2166bad
Author: Yuto Furuta 
AuthorDate: Wed Aug 22 07:44:14 2018 +0900

remove 'to delete' (#2418)

### Motivation

https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#get-1
https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#getstatus
https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#list-2

All `to delete` in description of options of the above URLs are unnecessary.

### Modifications

Remove `to delete`.
---
 site2/docs/reference-pulsar-admin.md | 16 
 .../version-2.1.0-incubating/reference-pulsar-admin.md   | 16 
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 51e5713..48fec1a 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -416,9 +416,9 @@ $ pulsar-admin functions get options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `getstatus`
@@ -432,9 +432,9 @@ $ pulsar-admin functions getstatus options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `list`
@@ -448,8 +448,8 @@ $ pulsar-admin functions list options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `querystate`
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
 
b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
index 43d3ef6..e1d2260 100644
--- 
a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
+++ 
b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
@@ -417,9 +417,9 @@ $ pulsar-admin functions get options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `getstatus`
@@ -433,9 +433,9 @@ $ pulsar-admin functions getstatus options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `list`
@@ -449,8 +449,8 @@ $ pulsar-admin functions list options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `querystate`



[GitHub] sijie closed pull request #2418: Fix Description of Options on Pulsar Admin

2018-08-21 Thread GitBox
sijie closed pull request #2418: Fix Description of Options on Pulsar Admin
URL: https://github.com/apache/incubator-pulsar/pull/2418
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 51e5713fff..48fec1a024 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -416,9 +416,9 @@ $ pulsar-admin functions get options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `getstatus`
@@ -432,9 +432,9 @@ $ pulsar-admin functions getstatus options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `list`
@@ -448,8 +448,8 @@ $ pulsar-admin functions list options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `querystate`
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
 
b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
index 43d3ef61fd..e1d2260bf6 100644
--- 
a/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
+++ 
b/site2/website/versioned_docs/version-2.1.0-incubating/reference-pulsar-admin.md
@@ -417,9 +417,9 @@ $ pulsar-admin functions get options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `getstatus`
@@ -433,9 +433,9 @@ $ pulsar-admin functions getstatus options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--name`|The name of the function to delete||
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--name`|The name of the function||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `list`
@@ -449,8 +449,8 @@ $ pulsar-admin functions list options
 Options
 |Flag|Description|Default|
 |---|---|---|
-|`--namespace`|The namespace of the function to delete||
-|`--tenant`|The tenant of the function to delete||
+|`--namespace`|The namespace of the function||
+|`--tenant`|The tenant of the function||
 
 
 ### `querystate`


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2420: [client] add properties to 
producer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211781198
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar/__init__.py
 ##
 @@ -321,7 +321,8 @@ def create_producer(self, topic,
 batching_max_messages=1000,
 batching_max_allowed_size_in_bytes=128*1024,
 batching_max_publish_delay_ms=10,
-
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+properties=None,
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #2423: [client] add properties to consumer for cpp & python client

2018-08-21 Thread GitBox
sijie opened a new pull request #2423: [client] add properties to consumer for 
cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2423
 
 
### Motivation
   
   This is a caught-up change to enable properties for consumer as java clients.
   
### Changes
   
   Enable properties on consumer for both cpp & python client
   
### Results
   
   Properties are added as metadata for CommandSubscribe. However there is no 
way
   to verify the consumer properties. so I didn't add any specific tests, just
   adding properties for both cpp and python clients in the tests, that should
   excerise the corresponding code path.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context

2018-08-21 Thread GitBox
srkukarni commented on a change in pull request #2411: [functions] change 
instance id from string to int and expose number of instances in context
URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211776412
 
 

 ##
 File path: 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 ##
 @@ -86,7 +86,14 @@
  *
  * @return the instance id
  */
-String getInstanceId();
+int getInstanceId();
+
+/**
+ * Get the number of instances that invoke this function.
+ *
+ * @return the number of instances that invoke this function.
+ */
+int getNumInstances();
 
 Review comment:
   Just add a comment stating that this might not be accurate during localrun


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2422: Improve the documentation about publish function

2018-08-21 Thread GitBox
srkukarni opened a new pull request #2422: Improve the documentation about 
publish function
URL: https://github.com/apache/incubator-pulsar/pull/2422
 
 
   ### Motivation
   Cleaned up the comments around publish
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer

2018-08-21 Thread GitBox
sijie commented on issue #2346: Issue #2330: change getTopicName in 
MultiTopicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/2346#issuecomment-414835396
 
 
   @jiazhai can you rebase this PR to master? 
   
   @merlimat can you review it again? we need this for 2.1.1 release


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2420: [client] add properties to producer for cpp & python client

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2420: [client] add properties to 
producer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211773131
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar/__init__.py
 ##
 @@ -321,7 +321,8 @@ def create_producer(self, topic,
 batching_max_messages=1000,
 batching_max_allowed_size_in_bytes=128*1024,
 batching_max_publish_delay_ms=10,
-
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+properties=None,
 
 Review comment:
   yeah will add


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context

2018-08-21 Thread GitBox
sijie commented on issue #2411: [functions] change instance id from string to 
int and expose number of instances in context
URL: https://github.com/apache/incubator-pulsar/pull/2411#issuecomment-414834875
 
 
   > What about python changes?
   
   okay will take a look at python 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2411: [functions] change instance 
id from string to int and expose number of instances in context
URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211772934
 
 

 ##
 File path: 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 ##
 @@ -86,7 +86,14 @@
  *
  * @return the instance id
  */
-String getInstanceId();
+int getInstanceId();
+
+/**
+ * Get the number of instances that invoke this function.
+ *
+ * @return the number of instances that invoke this function.
+ */
+int getNumInstances();
 
 Review comment:
   how about adding a comment there, saying `getNumInstances` will return -1 if 
the function is invoked from localrun?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2421: Fix bug during user publish

2018-08-21 Thread GitBox
srkukarni opened a new pull request #2421: Fix bug during user publish
URL: https://github.com/apache/incubator-pulsar/pull/2421
 
 
   ### Motivation
   
   This pr fixes two bugs in the path of user publish
   1. Stackoverflow error in ContextImpl where getSchema was calling itself
   2. Add topicName while building the producer.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2420: [client] add properties to producer for cpp & python client

2018-08-21 Thread GitBox
merlimat commented on a change in pull request #2420: [client] add properties 
to producer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2420#discussion_r211769327
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar/__init__.py
 ##
 @@ -321,7 +321,8 @@ def create_producer(self, topic,
 batching_max_messages=1000,
 batching_max_allowed_size_in_bytes=128*1024,
 batching_max_publish_delay_ms=10,
-
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+properties=None,
 
 Review comment:
   We should add `properties` to the pydoc string below


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #2420: [client] add properties to producer for cpp & python client

2018-08-21 Thread GitBox
sijie opened a new pull request #2420: [client] add properties to producer for 
cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2420
 
 
### Motivation
   
   This is a caught-up change to enable properties for producer as java clients.
   
### Changes
   
   Enable properties on producer for both cpp & python client
   
### Results
   
   Properties are added as metadata for CommandProducer. However there is no way
   to verify the producer properties. so I didn't add any specific tests, just
   adding properties for both cpp and python clients in the tests, that should
   excerise the corresponding code path.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2418: Fix Description of Options on Pulsar Admin

2018-08-21 Thread GitBox
sijie commented on issue #2418: Fix Description of Options on Pulsar Admin
URL: https://github.com/apache/incubator-pulsar/pull/2418#issuecomment-414827155
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2411: [functions] change instance id from string to int and expose number of instances in context

2018-08-21 Thread GitBox
srkukarni commented on a change in pull request #2411: [functions] change 
instance id from string to int and expose number of instances in context
URL: https://github.com/apache/incubator-pulsar/pull/2411#discussion_r211750888
 
 

 ##
 File path: 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 ##
 @@ -86,7 +86,14 @@
  *
  * @return the instance id
  */
-String getInstanceId();
+int getInstanceId();
+
+/**
+ * Get the number of instances that invoke this function.
+ *
+ * @return the number of instances that invoke this function.
+ */
+int getNumInstances();
 
 Review comment:
   getNumInstances won't work in the context of localrun.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2282: Document the settings to enable authorization

2018-08-21 Thread GitBox
sijie closed issue #2282: Document the settings to enable authorization
URL: https://github.com/apache/incubator-pulsar/issues/2282
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2282: Document the settings to enable authorization

2018-08-21 Thread GitBox
sijie commented on issue #2282: Document the settings to enable authorization
URL: 
https://github.com/apache/incubator-pulsar/issues/2282#issuecomment-414793036
 
 
   yeah the documentation is great!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2379: Initializing cluster metadata should handle /managed-ledger path better

2018-08-21 Thread GitBox
sijie commented on issue #2379: Initializing cluster metadata should handle 
/managed-ledger path better
URL: 
https://github.com/apache/incubator-pulsar/issues/2379#issuecomment-414792602
 
 
   @ivankelly I think we should two changes here:
   
   1) disable initializing /managed-ledgers in managed ledgers.
   2) handle /managed-ledgers exists situtation on initializing metadata.
   
   > Why is managed-ledgers an exception?
   
   I guess `managed-ledgers` code exists before other components in pulsar


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211723819
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
 
 Review comment:
   so the `redeliveryTracker.remove` should be called in `deleteCallback`, 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
sijie commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211724307
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -556,13 +618,90 @@ public synchronized void 
redeliverUnacknowledgedMessages(Consumer consumer) {
 
 @Override
 public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List positions) {
-positions.forEach(position -> 
messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
 if (log.isDebugEnabled()) {
 log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer, positions);
 }
+if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+for (PositionImpl position : positions) {
+if (redeliveryTracker.incrementAndGetRedeliveryCount(position) 
<= maxRedeliveryCount) {
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+} else {
+messagesToDeadLetter.add(position);
+}
+}
+if (messagesToDeadLetter.size() > 0) {
+CountDownLatch latch = new 
CountDownLatch(messagesToDeadLetter.size());
+for (PositionImpl position : messagesToDeadLetter) {
+cursor.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+@Override
+public void readEntryComplete(Entry entry, Object ctx) 
{
+if (entry == null) {
+log.error("[{}-{}] Read an null entry from 
cursor {}", name, consumer, position);
+latch.countDown();
+} else {
+try {
+ByteBuf headersAndPayload = 
entry.getDataBuffer();
+MessageImpl msg = 
MessageImpl.deserialize(headersAndPayload);
+headersAndPayload.retain();
+msg.setReplicatedFrom("DLQ");
+CompletableFuture future = 
deadLetterTopicProducer.sendAsync(msg);
+future.whenCompleteAsync((messageId, 
error) -> {
+if (error != null) {
+log.error("[{}-{}] Fail to send 
message to dead letter topic {} {} {}",
+name, consumer, 
deadLetterTopic, error.getMessage(), error);
+
messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+latch.countDown();
+} else {
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+latch.countDown();
+}
+});
+} catch (Throwable t) {
+log.error("[{}-{}] Failed to deserialize 
message at {} {} {} {}", name, consumer,
+entry.getPosition(), 
entry.getLedgerId(), t.getMessage(), t);
+cursor.asyncDelete(position, 
deleteCallback, position);
+redeliveryTracker.remove(position);
+entry.release();
+latch.countDown();
+}
+}
+}
+@Override
+public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+log.error("[{}-{}] Read entries failed {} {}", 
name, consumer, exception.getMessage(), exception);
+messagesToReplay.add(position.getLedgerId(), 
position.getEntryId());
+latch.countDown();
+}
+}, null);
+}
+try {
 
 Review comment:
   No sure I understand the problem here. Can you clarify it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #2419: [documentation] cherry-pick deploy-kubernetes documentation changes

2018-08-21 Thread GitBox
sijie opened a new pull request #2419: [documentation] cherry-pick 
deploy-kubernetes documentation changes
URL: https://github.com/apache/incubator-pulsar/pull/2419
 
 
   Cherry-pick changes from #2363 to 2.1.0 documentation
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar.wiki] branch master updated: Updated PIP 23: Message Tracing By Interceptors (markdown)

2018-08-21 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new ed4b0e9  Updated PIP 23: Message Tracing By Interceptors (markdown)
ed4b0e9 is described below

commit ed4b0e9a07bef40cb6aa85f9d430cf16cdfb175c
Author: Sijie Guo 
AuthorDate: Tue Aug 21 11:13:08 2018 -0700

Updated PIP 23: Message Tracing By Interceptors (markdown)
---
 PIP-23:-Message-Tracing-By-Interceptors.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md 
b/PIP-23:-Message-Tracing-By-Interceptors.md
index d57d558..58d5336 100644
--- a/PIP-23:-Message-Tracing-By-Interceptors.md
+++ b/PIP-23:-Message-Tracing-By-Interceptors.md
@@ -1,5 +1,5 @@
 - **Status**: Proposed
-- **Author**: [Penghui Li](https://github.com/codelipenghui)
+- **Author**: [Penghui Li](https://github.com/codelipenghui), [Jia 
Zhai](https://github.com/zhaijack)
 - **Pull Request**: -
 - **Mailing List discussion**: 
 



[GitHub] merlimat commented on issue #2418: Fix Description of Options on Pulsar Admin

2018-08-21 Thread GitBox
merlimat commented on issue #2418: Fix Description of Options on Pulsar Admin
URL: https://github.com/apache/incubator-pulsar/pull/2418#issuecomment-414754274
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2414: allow predicates concerning publish time to push down to pulsar

2018-08-21 Thread GitBox
jerrypeng commented on issue #2414:  allow predicates concerning publish time 
to push down to pulsar
URL: https://github.com/apache/incubator-pulsar/pull/2414#issuecomment-414744387
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2401: Consumer often discards received messages

2018-08-21 Thread GitBox
massakam commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-414642945
 
 
   This phenomenon seems to happen when the consumer is receiving a lot of 
messages. For example, it is prone to happen when the consumer connects to a 
topic with a large number of messages in the backlog.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar.wiki] branch master updated: Created PIP 23: Message Tracing By Interceptors (markdown)

2018-08-21 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new e0388dd  Created PIP 23: Message Tracing By Interceptors (markdown)
e0388dd is described below

commit e0388dd8484f562109c6a4e689983883d7c1beba
Author: Sijie Guo 
AuthorDate: Tue Aug 21 03:47:09 2018 -0700

Created PIP 23: Message Tracing By Interceptors (markdown)
---
 PIP-23:-Message-Tracing-By-Interceptors.md | 239 +
 1 file changed, 239 insertions(+)

diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md 
b/PIP-23:-Message-Tracing-By-Interceptors.md
new file mode 100644
index 000..d57d558
--- /dev/null
+++ b/PIP-23:-Message-Tracing-By-Interceptors.md
@@ -0,0 +1,239 @@
+- **Status**: Proposed
+- **Author**: [Penghui Li](https://github.com/codelipenghui)
+- **Pull Request**: -
+- **Mailing List discussion**: 
+
+## Requirement & Overview
+
+This is related to issue #2280. 
+And for Pulsar itself, That will be useful also to track performance issues 
and understand more of the latency breakdowns. Tracing is the easiest way to 
do: "show me a request with high-latency and show where the latency (for this 
particular request) was coming from.
+
+## GOALS
+
+A good approach to that, is to integrate a tracing standard, either 
OpenTracing or OpenCensus, at both bookkeeper and pulsar. so their applications 
can integrate with OpenTracing or OpenCensus as well,  then application's 
traceId, spanId can be passed all way from producer to broker to bookie and to 
consumer. Then those traces can be collected and fed into an ElasticSearch 
cluster for query.
+
+## IMPLEMENTATION
+
+Integration with a tracing system typically requires tight integration with 
applications. Especially if Pulsar is used in a data pipeline, Pulsar should be 
able to provide mechanism to be able to propagate trace context end-to-end. 
That says Pulsar doesn’t have to integrate with any tracing framework directly, 
but it has to provide mechanism for applications to do so. This can be done by:
+
+- Use `properties` in message header for propagating trace context.
+- Provide an interface to intercept message pipeline in Pulsar, which allows 
applications customize their own tracing behaviors and meet their requirements.
+
+This PIP introduces **Interceptor** to examine (and potentially modify) 
messages at key places during the lifecycle of a Pulsar message. The 
interceptors include `ProducerInterceptor` for intercepting messages at 
Producer side, and `ConsumerInterceptor` for intercepting messages at Consumer 
side.
+
+### Producer Interceptor
+
+The ProducerInterceptor intercept messages before sending them and on 
receiving acknowledgement from brokers. The interfaces for a 
ProducerInterceptor are defined as below:
+
+```java
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the 
+ * messages received by the producer before they are published to the Pulsar   
+ * brokers.
+ * 
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, 
but  
+ * not propagated further.
+ * 
+ * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor  
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor extends AutoCloseable {
+
+/**
+ * Close the interceptor.
+ */
+void close();
+
+/**
+ * This is called from {@link Producer#send(Object)} and {@link  
+ * Producer#sendAsync(Object)} methods, before
+ * send the message to the brokers. This method is allowed to modify the 
+ * record, in which case, the new record
+ * will be returned.
+ * 
+ * Any exception thrown by this method will be caught by the caller and 
+ * logged, but not propagated further.
+ * 
+ * Since the producer may run multiple interceptors, a particular 
+ * interceptor's {@link #beforeSend(Message)} callback will be called in 
the   
+ * order specified by 
+ * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+ * 
+ * The first interceptor in the list gets the message passed from the 
client, 
+ * the following interceptor will be passed the message returned by the   
+ * previous interceptor, and so on. Since interceptors are allowed to 
modify
+ * messages, interceptors may potentially get the message already modified 
by   
+ * other interceptors. However, building a pipeline of mutable 
interceptors   
+ * that depend on the output of the previous interceptor is discouraged, 
+ * because of potential side-effects caused by interceptors potentially   
+ * failing to modify the message and throwing an exception. If one of the 
+ * interceptors in the list throws an exception from   
+ * {@link#beforeSen

[incubator-pulsar.wiki] branch master updated: Updated Home (markdown)

2018-08-21 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new 7d081a9  Updated Home (markdown)
7d081a9 is described below

commit 7d081a92f165cb1e0f3067f4709b80cd5da23d28
Author: Sijie Guo 
AuthorDate: Tue Aug 21 03:39:08 2018 -0700

Updated Home (markdown)
---
 Home.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/Home.md b/Home.md
index cf0aa19..74324a2 100644
--- a/Home.md
+++ b/Home.md
@@ -5,10 +5,11 @@
 
 We encourage to document any big change or feature or any addition to public 
user APIs through a design document to be discussed with the community.
 
-*Next Proposal Number: 23*
+*Next Proposal Number: 24*
 
 ### Proposed
 
+* [[PIP 23: Message Tracing By Interceptors]]
 * [[PIP 22: Pulsar Dead Letter Topic]]
 * [[PIP 21: Pulsar Edge Component]]
 



[GitHub] k2la opened a new pull request #2418: Fix Description of Options on Pulsar Admin

2018-08-21 Thread GitBox
k2la opened a new pull request #2418: Fix Description of Options on Pulsar Admin
URL: https://github.com/apache/incubator-pulsar/pull/2418
 
 
   ### Motivation
   
   https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#get-1
   https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#getstatus
   https://pulsar.incubator.apache.org/docs/en/pulsar-admin/#list-2
   
   All `to delete` in description of options of the above URLs are unnecessary.
   
   ### Modifications
   
   Remove `to delete`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2397: Integrate Functions with Schema

2018-08-21 Thread GitBox
srkukarni commented on a change in pull request #2397: Integrate Functions with 
Schema
URL: https://github.com/apache/incubator-pulsar/pull/2397#discussion_r211515618
 
 

 ##
 File path: 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 ##
 @@ -161,15 +161,19 @@
 
 /**
  * Publish an object using serDe for serializing to the topic
- * @param topicName The name of the topic for publishing
- * @param object The object that needs to be published
- * @param serDeClassName The class name of the class that needs to be used 
to serialize the object before publishing
+ *
+ * @param topicName
+ *The name of the topic for publishing
+ * @param object
+ *The object that needs to be published
+ * @param topicsPattern
+ *Either a builtin schema type (eg: "avro", "json", 
"protobuf") or the class name of the custom schema class
  * @return A future that completes when the framework is done publishing 
the message
  */
- CompletableFuture publish(String topicName, O object, String 
serDeClassName);
+ CompletableFuture publish(String topicName, O object, String 
schemaType);
 
 Review comment:
   Serde's are not being deprecated. The only thing that is being deprecated is 
DefaultSerde. Thus I believe we should add another interface supporting schema 
rather than delete this


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2397: Integrate Functions with Schema

2018-08-21 Thread GitBox
srkukarni commented on a change in pull request #2397: Integrate Functions with 
Schema
URL: https://github.com/apache/incubator-pulsar/pull/2397#discussion_r211515762
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
 ##
 @@ -469,20 +477,56 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
 // set source spec
 // source spec classname should be empty so that the default 
pulsar source will be used
 SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
 
 Review comment:
   serde's are not deprecated. Only DefaultSerde is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services