[GitHub] [pulsar] sijie commented on a change in pull request #4911: expose getLastMessageId method in ConsumerImpl

2019-08-13 Thread GitBox
sijie commented on a change in pull request #4911: expose getLastMessageId 
method in ConsumerImpl
URL: https://github.com/apache/pulsar/pull/4911#discussion_r313229980
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
 ##
 @@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.pulsar.client.api.MessageId;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * A MessageId implementation that contains a map of .
+ * This is useful when MessageId is need for partition/multi-topics/pattern 
consumer.
+ * e.g. seek(), ackCumulative(), getLastMessageId().
+ */
+public class MultiMessageIdImpl implements MessageId {
+@Getter
+private Map map;
+
+MultiMessageIdImpl(Map map) {
+this.map = map;
+}
+
+// TODO: Add support for Serialization and Deserialization
+//  https://github.com/apache/pulsar/issues/4940
+@Override
+public byte[] toByteArray() {
+throw new NotImplementedException();
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(map);
+}
+
+// If all messageId in map are same size, and all bigger/smaller than the 
other, return valid value.
+@Override
+public int compareTo(MessageId o) {
+if (!(o instanceof MultiMessageIdImpl)) {
+throw new IllegalArgumentException(
+"expected MultiMessageIdImpl object. Got instance of " + 
o.getClass().getName());
+}
+
+MultiMessageIdImpl other = (MultiMessageIdImpl) o;
+Map otherMap = other.getMap();
+
+if ((map == null || map.isEmpty()) && (otherMap == null || 
otherMap.isEmpty())) {
+return 0;
+}
+
+if (otherMap == null || map == null || otherMap.size() != map.size()) {
+throw new IllegalArgumentException("Current size and other size 
not equals");
+}
+
+int result = 0;
+for (Entry entry : map.entrySet()) {
+MessageId otherMessage = otherMap.get(entry.getKey());
+if (otherMessage == null) {
+throw new IllegalArgumentException(
+"Other MessageId not have topic " + entry.getKey());
+}
+
+int currentResult = entry.getValue().compareTo(otherMessage);
+if (result == 0) {
+result = currentResult;
 
 Review comment:
   I think the logic here is problematic. This resets the result back to latest 
result.
   
   a MultipleMessageId is larger than the other one is when all the partitions' 
messageId is larger or equals to the messageIds, no?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] jiazhai commented on a change in pull request #4911: expose getLastMessageId method in ConsumerImpl

2019-08-13 Thread GitBox
jiazhai commented on a change in pull request #4911: expose getLastMessageId 
method in ConsumerImpl
URL: https://github.com/apache/pulsar/pull/4911#discussion_r313306254
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
 ##
 @@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.pulsar.client.api.MessageId;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * A MessageId implementation that contains a map of .
+ * This is useful when MessageId is need for partition/multi-topics/pattern 
consumer.
+ * e.g. seek(), ackCumulative(), getLastMessageId().
+ */
+public class MultiMessageIdImpl implements MessageId {
+@Getter
+private Map map;
+
+MultiMessageIdImpl(Map map) {
+this.map = map;
+}
+
+// TODO: Add support for Serialization and Deserialization
+//  https://github.com/apache/pulsar/issues/4940
+@Override
+public byte[] toByteArray() {
+throw new NotImplementedException();
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(map);
+}
+
+// If all messageId in map are same size, and all bigger/smaller than the 
other, return valid value.
+@Override
+public int compareTo(MessageId o) {
+if (!(o instanceof MultiMessageIdImpl)) {
+throw new IllegalArgumentException(
+"expected MultiMessageIdImpl object. Got instance of " + 
o.getClass().getName());
+}
+
+MultiMessageIdImpl other = (MultiMessageIdImpl) o;
+Map otherMap = other.getMap();
+
+if ((map == null || map.isEmpty()) && (otherMap == null || 
otherMap.isEmpty())) {
+return 0;
+}
+
+if (otherMap == null || map == null || otherMap.size() != map.size()) {
+throw new IllegalArgumentException("Current size and other size 
not equals");
+}
+
+int result = 0;
+for (Entry entry : map.entrySet()) {
+MessageId otherMessage = otherMap.get(entry.getKey());
+if (otherMessage == null) {
+throw new IllegalArgumentException(
+"Other MessageId not have topic " + entry.getKey());
+}
+
+int currentResult = entry.getValue().compareTo(otherMessage);
+if (result == 0) {
+result = currentResult;
 
 Review comment:
   1. result == 0 && currentResult ==0, then this reset has no effect.
   2. result ==0 && currentResult !=0, then we keep the non-zero result, this 
is the result that may be finally returned. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] kimcs commented on issue #4912: Java client Reader readNext and hasMessageAvailable not working when attempting to find last message in topic

2019-08-13 Thread GitBox
kimcs commented on issue #4912: Java client Reader readNext and 
hasMessageAvailable not working when attempting to find last message in topic
URL: https://github.com/apache/pulsar/issues/4912#issuecomment-520805615
 
 
   Currently I'm using the following workaround to be able to read the last 
message of a topic:
   Use the pulsar-admin-client to get the last-message-id of a topic (-1 means 
empty topic), then use pulsar-client Consumer.seek to that id and then the next 
receive call will get the last message of the topic. This is not an ideal 
solution as it also relies on the pulsar-admin client.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar-client-go] jiazhai commented on a change in pull request #43: Support partition consumer receive async and fix batch logic

2019-08-13 Thread GitBox
jiazhai commented on a change in pull request #43: Support partition consumer 
receive async and fix batch logic
URL: https://github.com/apache/pulsar-client-go/pull/43#discussion_r313375268
 
 

 ##
 File path: pulsar/impl_partition_consumer.go
 ##
 @@ -274,30 +320,45 @@ func (pc *partitionConsumer) ReceiveAsync(ctx 
context.Context, msgs chan<- Consu
case tmpMsg, ok := <-pc.subQueue:
if ok {
msgs <- tmpMsg
-   id := &pb.MessageIdData{}
-   err := proto.Unmarshal(tmpMsg.ID().Serialize(), 
id)
+
+   err := pc.messageProcessed(tmpMsg.ID(), 
receivedSinceFlow)
if err != nil {
-   
pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
return err
}
-   if pc.unAckTracker != nil {
-   pc.unAckTracker.Add(id)
-   }
-   receivedSinceFlow++
-   if receivedSinceFlow >= highwater {
-   if err := 
pc.internalFlow(receivedSinceFlow); err != nil {
-   pc.log.Errorf("Send Flow cmd 
error:%s", err.Error())
-   return err
-   }
-   receivedSinceFlow = 0
-   }
continue
}
+   break
case <-ctx.Done():
return ctx.Err()
}
}
+}
+
+func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, 
callback func(msg Message, err error)) {
+   highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+
+   // request half the buffer's capacity
+   if err := pc.internalFlow(highwater); err != nil {
 
 Review comment:
   try to do it in background


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar-client-go] jiazhai commented on a change in pull request #43: Support partition consumer receive async and fix batch logic

2019-08-13 Thread GitBox
jiazhai commented on a change in pull request #43: Support partition consumer 
receive async and fix batch logic
URL: https://github.com/apache/pulsar-client-go/pull/43#discussion_r313375713
 
 

 ##
 File path: pulsar/impl_partition_consumer.go
 ##
 @@ -238,32 +252,64 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*handleUnsubscribe) {
unsub.waitGroup.Done()
 }
 
-func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
-   select {
-   case <-ctx.Done():
-   return nil, ctx.Err()
-   case cm, ok := <-pc.subQueue:
-   if ok {
-   id := &pb.MessageIdData{}
-   err := proto.Unmarshal(cm.ID().Serialize(), id)
-   if err != nil {
-   pc.log.WithError(err).Errorf("unserialize 
message id error:%s", err.Error())
-   return nil, err
-   }
-   if pc.unAckTracker != nil {
-   pc.unAckTracker.Add(id)
-   }
-   return cm.Message, nil
+func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
+   id := &pb.MessageIdData{}
+   err := proto.Unmarshal(msgID.Serialize(), id)
+   if err != nil {
+   pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
+   return err
+   }
+   if pc.unAckTracker != nil {
+   pc.unAckTracker.Add(id)
+   }
+   return nil
+}
+
+func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow 
uint32) error {
+   highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+   if receivedSinceFlow >= highwater {
+   if err := pc.internalFlow(receivedSinceFlow); err != nil {
+   pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+   return err
}
-   return nil, newError(ResultConnectError, "receive queue closed")
+   receivedSinceFlow = 0
+   }
+   return nil
+}
+
+func (pc *partitionConsumer) messageProcessed(msgID MessageID, 
receivedSinceFlow uint32) error {
+   err := pc.trackMessage(msgID)
+   if err != nil {
+   return err
+   }
+   receivedSinceFlow++
+
+   err = pc.increaseAvailablePermits(receivedSinceFlow)
+   if err != nil {
+   return err
}
+
+   return nil
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, 
err error) {
+   wg := &sync.WaitGroup{}
+   wg.Add(1)
+   pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
+   message = msg
+   err = e
+   wg.Done()
+   })
+   wg.Wait()
+
+   return message, err
 }
 
 func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
-   highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+   highWater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
 
// request half the buffer's capacity
-   if err := pc.internalFlow(highwater); err != nil {
+   if err := pc.internalFlow(highWater); err != nil {
 
 Review comment:
   also here, in background


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] tuteng commented on issue #4918: [dashboard] fix peek parse message error

2019-08-13 Thread GitBox
tuteng commented on issue #4918: [dashboard] fix peek parse message error
URL: https://github.com/apache/pulsar/pull/4918#issuecomment-520874584
 
 
   lgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java client.

2019-08-13 Thread GitBox
codelipenghui commented on issue #4621: [PIP-38] Support batch receive in java 
client.
URL: https://github.com/apache/pulsar/pull/4621#issuecomment-520881629
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] mingfang commented on issue #3121: Several issues with the kubernetes generic example

2019-08-13 Thread GitBox
mingfang commented on issue #3121: Several issues with the kubernetes generic 
example
URL: https://github.com/apache/pulsar/issues/3121#issuecomment-520952751
 
 
   I'm still seeing this problem with 2.4.0.
   I'm deploying bookie as stateful set and just using in container storage for 
testing.
   The error seems to be related to zookeeper info and not related to storage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] marcusalmeida opened a new pull request #4946: [pulsar-functions][python-examples] Fixing python function example custom_object_function.py

2019-08-13 Thread GitBox
marcusalmeida opened a new pull request #4946: 
[pulsar-functions][python-examples] Fixing python function example 
custom_object_function.py
URL: https://github.com/apache/pulsar/pull/4946
 
 
   ### Motivation
   
   Fixing Python examples for pulsar-functions custom_object_function.py
   
   ### Modifications
   
   Adding conversion of "str" to "bytes" using the methods "encode" and 
"decode" from string.
   
   To perform the tests run:
   ```sh
   $ bin/pulsar-admin functions create \
--py $PULSAR_HOME/examples/python-examples/custom_object_function.py \
--tenant public \
--namespace default \
--name custom_object \
--inputs input-topic \
--custom-serde-inputs '{"input-topic": 
"custom_object_function.CustomSerDe"}' \
--output output-topic \
--output-serde-classname custom_object_function.CustomSerDe \
--classname custom_object_function.CustomObjectFunction 
```
   
   And trigger the function using the CLI "functions trigger" :
   ```sh
   $ pulsar-admin functions trigger \
 --name custom \
 --tenant public \ 
 --namespace default \
 --trigger-value "1,2"
   ```
   And should return:

   12, 26
   

   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API: no
 - The schema: no
 - The default values of configurations: no
 - The wire protocol: no
 - The rest endpoints: no
 - The admin cli options: no
 - Anything that affects deployment: no
   
   ### Documentation
   
 - Does this pull request introduce a new feature? no
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] massakam commented on issue #4945: [pulsar-websocket] Do not log stack trace with warn log in websocket proxy

2019-08-13 Thread GitBox
massakam commented on issue #4945: [pulsar-websocket] Do not log stack trace 
with warn log in websocket proxy
URL: https://github.com/apache/pulsar/pull/4945#issuecomment-521066796
 
 
   rerun cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar] branch master updated: [dashboard] fix peek parse message error (#4918)

2019-08-13 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a8b57c9  [dashboard] fix peek parse message error (#4918)
a8b57c9 is described below

commit a8b57c97c058862bce1c5b9d3435768d2071f09b
Author: Yi Tang 
AuthorDate: Wed Aug 14 09:25:50 2019 +0800

[dashboard] fix peek parse message error (#4918)

Fixes #4917

### Motivation

dashboard peek message api raise exception when format response cause treat 
all message as a JSON string

### Modifications

* format as JSON only if message is a JSON,
* otherwise if message is printable, return the original message,
* otherwise print hex like command with --hex option.
---
 dashboard/Dockerfile |  2 +-
 dashboard/django/stats/templates/stats/peek.html |  5 ++-
 dashboard/django/stats/views.py  | 47 ++--
 3 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/dashboard/Dockerfile b/dashboard/Dockerfile
index 01c84ea..6af19c4 100644
--- a/dashboard/Dockerfile
+++ b/dashboard/Dockerfile
@@ -25,7 +25,7 @@ RUN apt-get update
 RUN apt-get -y install postgresql python sudo nginx supervisor
 
 # Python dependencies
-RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests
+RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests hexdump
 
 # Postgres configuration
 COPY conf/postgresql.conf /etc/postgresql/9.6/main/
diff --git a/dashboard/django/stats/templates/stats/peek.html 
b/dashboard/django/stats/templates/stats/peek.html
index ebe4821..17d66b0 100644
--- a/dashboard/django/stats/templates/stats/peek.html
+++ b/dashboard/django/stats/templates/stats/peek.html
@@ -19,4 +19,7 @@
 
 -->
 
-{{ message_body }}
\ No newline at end of file
+
+{{ message_type }}
+{{ message_body }}
+
diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py
index b49994b..d0a7af4 100644
--- a/dashboard/django/stats/views.py
+++ b/dashboard/django/stats/views.py
@@ -18,12 +18,15 @@
 #
 
 import logging
+import struct
+
 from django.shortcuts import render, get_object_or_404, redirect
 from django.template import loader
 from django.urls import reverse
 from django.views import generic
 from django.db.models import Q, IntegerField
 from dashboard import settings
+import hexdump
 import requests, json, re
 
 from django.http import HttpResponseRedirect, HttpResponse
@@ -363,13 +366,51 @@ def messages(request, topic_name, subscription_name):
 'subtitle' : subscription_name,
 })
 
+
+def message_skip_meta(message_view):
+if not message_view or len(message_view) < 4:
+raise ValueError("invalid message")
+meta_size = struct.unpack(">I", message_view[:4])
+message_index = 4 + meta_size[0]
+if len(message_view) < message_index:
+raise ValueError("invalid message")
+return message_view[message_index:]
+
+
+def get_message_from_http_response(response):
+if response.status_code != 200:
+return "ERROR", "status_code=%d" % response.status_code
+message_view = memoryview(response.content)
+if 'X-Pulsar-num-batch-message' in response.headers:
+batch_size = int(response.headers['X-Pulsar-num-batch-message'])
+if batch_size == 1:
+message_view = message_skip_meta(message_view)
+else:
+# TODO: can not figure out multi-message batch for now
+return "Batch(size=%d)" % batch_size, ""
+
+try:
+text = str(message_view,
+   encoding=response.encoding or response.apparent_encoding,
+   errors='replace')
+if not text.isprintable():
+return "Hex", hexdump.hexdump(message_view, result='return')
+except (LookupError, TypeError):
+return "Hex", hexdump.hexdump(message_view, result='return')
+try:
+return "JSON", json.dumps(json.loads(text),
+  ensure_ascii=False, indent=4)
+except json.JSONDecodeError:
+return "Text", text
+
+
 def peek(request, topic_name, subscription_name, message_number):
 url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' 
+ subscription_name + '/position/' + message_number
 response = requests.get(url)
-message = response.text
-message = message[message.index('{'):]
+message_type, message = get_message_from_http_response(response)
 context = {
-'message_body' : json.dumps(json.loads(message), indent=4),
+'message_type': message_type,
+'message_body': message,
 }
 return render(request, 'stats/peek.html', context)
 



[GitHub] [pulsar] sijie closed issue #4917: [dashboard] peek parse response raise exception

2019-08-13 Thread GitBox
sijie closed issue #4917: [dashboard] peek parse response raise exception
URL: https://github.com/apache/pulsar/issues/4917
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie merged pull request #4918: [dashboard] fix peek parse message error

2019-08-13 Thread GitBox
sijie merged pull request #4918: [dashboard] fix peek parse message error
URL: https://github.com/apache/pulsar/pull/4918
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #4911: expose getLastMessageId method in ConsumerImpl

2019-08-13 Thread GitBox
sijie commented on a change in pull request #4911: expose getLastMessageId 
method in ConsumerImpl
URL: https://github.com/apache/pulsar/pull/4911#discussion_r313676335
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java
 ##
 @@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.pulsar.client.api.MessageId;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * A MessageId implementation that contains a map of .
+ * This is useful when MessageId is need for partition/multi-topics/pattern 
consumer.
+ * e.g. seek(), ackCumulative(), getLastMessageId().
+ */
+public class MultiMessageIdImpl implements MessageId {
+@Getter
+private Map map;
+
+MultiMessageIdImpl(Map map) {
+this.map = map;
+}
+
+// TODO: Add support for Serialization and Deserialization
+//  https://github.com/apache/pulsar/issues/4940
+@Override
+public byte[] toByteArray() {
+throw new NotImplementedException();
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(map);
+}
+
+// If all messageId in map are same size, and all bigger/smaller than the 
other, return valid value.
+@Override
+public int compareTo(MessageId o) {
+if (!(o instanceof MultiMessageIdImpl)) {
+throw new IllegalArgumentException(
+"expected MultiMessageIdImpl object. Got instance of " + 
o.getClass().getName());
+}
+
+MultiMessageIdImpl other = (MultiMessageIdImpl) o;
+Map otherMap = other.getMap();
+
+if ((map == null || map.isEmpty()) && (otherMap == null || 
otherMap.isEmpty())) {
+return 0;
+}
+
+if (otherMap == null || map == null || otherMap.size() != map.size()) {
+throw new IllegalArgumentException("Current size and other size 
not equals");
+}
+
+int result = 0;
+for (Entry entry : map.entrySet()) {
+MessageId otherMessage = otherMap.get(entry.getKey());
+if (otherMessage == null) {
+throw new IllegalArgumentException(
+"Other MessageId not have topic " + entry.getKey());
+}
+
+int currentResult = entry.getValue().compareTo(otherMessage);
+if (result == 0) {
+result = currentResult;
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar-client-go] wolfstudy opened a new issue #53: Fatal error: concurrent map writes

2019-08-13 Thread GitBox
wolfstudy opened a new issue #53: Fatal error: concurrent map writes
URL: https://github.com/apache/pulsar-client-go/issues/53
 
 
    Expected behavior
   
   The program works fine when we create multiple partitions.
   
    Actual behavior
   
   ```
   fatal error: concurrent map writes
   
   goroutine 119 [running]:
   runtime.throw(0x4501a47, 0x15)
/usr/local/Cellar/go/1.12.5/libexec/src/runtime/panic.go:617 +0x72 
fp=0xc0003dfe88 sp=0xc0003dfe58 pc=0x402fbf2
   runtime.mapassign_fast64(0x4478040, 0xc000277a40, 0x34, 0x0)
/usr/local/Cellar/go/1.12.5/libexec/src/runtime/map_fast64.go:101 
+0x35f fp=0xc0003dfec8 sp=0xc0003dfe88 pc=0x4013cdf
   
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00016e2a0)

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:261
 +0x19d fp=0xc0003dffb8 sp=0xc0003dfec8 pc=0x430c55d
   
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00016e2a0)

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:146
 +0x59 fp=0xc0003dffd8 sp=0xc0003dffb8 pc=0x4311b09
   runtime.goexit()
/usr/local/Cellar/go/1.12.5/libexec/src/runtime/asm_amd64.s:1337 +0x1 
fp=0xc0003dffe0 sp=0xc0003dffd8 pc=0x405e331
   created by 
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:143
 +0x3f
   
   goroutine 1 [chan receive]:
   testing.(*T).Run(0xc000466000, 0x45075c9, 0x21, 0x4516718, 0x407cb01)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:917 +0x381
   testing.runTests.func1(0xc00015e000)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1157 +0x78
   testing.tRunner(0xc00015e000, 0xce7e30)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
   testing.runTests(0xccc480, 0x48a7d00, 0x23, 0x23, 0x0)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1155 +0x2a9
   testing.(*M).Run(0xc00014, 0x0)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1072 +0x162
   main.main()
_testmain.go:110 +0x13e
   
   goroutine 81 [chan receive]:
   github.com/apache/pulsar-client-go/pulsar.newProducer(0xc000222780, 
0xc000141480, 0xc0003803f0, 0xc000466100, 0xc000315ef0)

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:92 
+0x1e4
   
github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer(0xc000222780,
 0x450cf17, 0x2d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_client.go:91 +0x7a
   
github.com/apache/pulsar-client-go/pulsar.TestPartitionTopicsConsumerPubSub(0xc000466000)

/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:367 
+0x277
   testing.tRunner(0xc000466000, 0x4516718)
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
   created by testing.(*T).Run
/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:916 +0x35a
   ```
   
    Steps to reproduce
   
   How can we reproduce the issue
   
    System configuration
   **Pulsar version**: x.y
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar-client-go] wolfstudy merged pull request #43: Support partition consumer receive async and fix batch logic

2019-08-13 Thread GitBox
wolfstudy merged pull request #43: Support partition consumer receive async and 
fix batch logic
URL: https://github.com/apache/pulsar-client-go/pull/43
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar-client-go] wolfstudy closed issue #36: Support partition-consumer receive async logic

2019-08-13 Thread GitBox
wolfstudy closed issue #36: Support partition-consumer receive async logic
URL: https://github.com/apache/pulsar-client-go/issues/36
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[pulsar-client-go] branch master updated: Support partition consumer receive async and fix batch logic (#43)

2019-08-13 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 448387d  Support partition consumer receive async and fix batch logic 
(#43)
448387d is described below

commit 448387d738a2f3af4c8232daa4fac9576d252617
Author: 冉小龙 
AuthorDate: Wed Aug 14 11:00:00 2019 +0800

Support partition consumer receive async and fix batch logic (#43)

Signed-off-by: xiaolong.ran ranxiaolong...@gmail.com

* Support batch logic for project

* add unit test case of event time

* add some unit tests case for producer

* fix error result type

* add unit test case of producer flush

* add receiver queue size test logic

* support partition consumer receive async

* add unit test case of ack timeout

* Fix consumer receiving message out of order
---
 pulsar/consumer.go|   3 +
 pulsar/consumer_test.go   | 301 +-
 pulsar/error.go   |   2 +-
 pulsar/impl_consumer.go   |  55 +--
 pulsar/impl_partition_consumer.go | 217 ---
 pulsar/impl_partition_producer.go |  21 ++-
 pulsar/internal/commands.go   |  82 +++
 pulsar/internal/connection.go |  54 +++
 pulsar/producer_test.go   | 260 +++-
 pulsar/unackedMsgTracker.go   |  20 +--
 util/util.go  |  24 ++-
 util/util_test.go |  21 ++-
 12 files changed, 870 insertions(+), 190 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index de190e0..c259cd6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -142,6 +142,9 @@ type Consumer interface {
// ReceiveAsync appends the message to the msgs channel asynchronously.
ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
 
+   // ReceiveAsyncWithCallback returns a callback containing the message 
and error objects
+   ReceiveAsyncWithCallback(ctx context.Context, callback func(msg 
Message, err error))
+
// Ack the consumption of a single message
Ack(Message) error
 
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 39646d3..6fe86cd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
@@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) {
assert.Equal(t, err.Error(), "connection error")
 }
 
+func TestBatchMessageReceive(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: lookupURL,
+   })
+
+   assert.Nil(t, err)
+   defer client.Close()
+
+   topicName := "persistent://public/default/receive-batch"
+   subName := "subscription-name"
+   prefix := "msg-batch-"
+   ctx := context.Background()
+
+   // Enable batching on producer side
+   batchSize, numOfMessages := 2, 100
+
+   // create producer
+   producer, err := client.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   BatchingMaxMessages: uint(batchSize),
+   DisableBatching: false,
+   BlockIfQueueFull:true,
+   })
+   assert.Nil(t, err)
+   assert.Equal(t, topicName, producer.Topic())
+   defer producer.Close()
+
+   consumer, err := client.Subscribe(ConsumerOptions{
+   Topic:topicName,
+   SubscriptionName: subName,
+   })
+   assert.Equal(t, topicName, consumer.Topic())
+   count := 0
+
+   for i := 0; i < numOfMessages; i++ {
+   messageContent := prefix + fmt.Sprintf("%d", i)
+   msg := &ProducerMessage{
+   Payload: []byte(messageContent),
+   }
+   err := producer.Send(ctx, msg)
+   assert.Nil(t, err)
+   }
+
+   for i := 0; i < numOfMessages; i++ {
+   msg, err := consumer.Receive(ctx)
+   assert.Nil(t, err)
+   err = consumer.Ack(msg)
+   assert.Nil(t, err)
+   count++
+   }
+
+   // check strategically
+   for i := 0; i < 3; i++ {
+   if count == numOfMessages {
+   break
+   }
+   time.Sleep(time.Second)
+   }
+   assert.Equal(t, count, numOfMessages)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(C

[GitHub] [pulsar-client-go] wolfstudy opened a new pull request #54: [Issue:53]v

2019-08-13 Thread GitBox
wolfstudy opened a new pull request #54: [Issue:53]v
URL: https://github.com/apache/pulsar-client-go/pull/54
 
 
   Signed-off-by: xiaolong.ran 
   
   Fixes #53 
   
   ### Motivation
   
   Fix concurrent map write and add test case  for this.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] kimcs commented on issue #4928: [java-client] ConsumerBuilder should have option for defaultSubscriptionInitialPosition

2019-08-13 Thread GitBox
kimcs commented on issue #4928: [java-client] ConsumerBuilder should have 
option for defaultSubscriptionInitialPosition
URL: https://github.com/apache/pulsar/issues/4928#issuecomment-521124404
 
 
   I have changed my design and now rely on the pulsar client application to 
track positions rather than using subscriptions and acknowledgements, so I no 
longer need this feature for what I'm working on. Still I think this option is 
useful, as it allows using Pulsar subscriptions to be master of stream progress 
with at-least-once semantics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services