This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 397ef33  fix: non-batched messages cause sql query to fail (#3684)
397ef33 is described below

commit 397ef3322e8b082817ecb70351d4923e414daf9f
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Feb 25 18:21:32 2019 -0800

    fix: non-batched messages cause sql query to fail (#3684)
---
 .../org/apache/pulsar/common/api/raw/MessageParser.java     |  2 +-
 .../pulsar/tests/integration/presto/TestBasicPresto.java    | 13 +++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index d894948..1f1d66e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -86,7 +86,7 @@ public class MessageParser {
 
             if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
                 processor.process(
-                        RawMessageImpl.get(refCntMsgMetadata, null, 
uncompressedPayload, ledgerId, entryId, 0));
+                        RawMessageImpl.get(refCntMsgMetadata, null, 
uncompressedPayload.retain(), ledgerId, entryId, 0));
             } else {
                 // handle batch message enqueuing; uncompressed payload has 
all messages in batch
                 receiveIndividualMessagesFromBatch(refCntMsgMetadata, 
uncompressedPayload, ledgerId, entryId, processor);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 505ada4..48cffb7 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -67,7 +67,16 @@ public class TestBasicPresto extends PulsarTestSuite {
     }
 
     @Test
-    public void testSimpleSQLQuery() throws Exception {
+    public void testSimpleSQLQueryBatched() throws Exception {
+        testSimpleSQLQuery(true);
+    }
+
+    @Test
+    public void testSimpleSQLQueryNonBatched() throws Exception {
+        testSimpleSQLQuery(false);
+    }
+    
+    public void testSimpleSQLQuery(boolean isBatched) throws Exception {
 
         @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
@@ -79,9 +88,9 @@ public class TestBasicPresto extends PulsarTestSuite {
         @Cleanup
         Producer<Stock> producer = 
pulsarClient.newProducer(JSONSchema.of(Stock.class))
                 .topic(stocksTopic)
+                .enableBatching(isBatched)
                 .create();
 
-
         for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
             final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
             producer.send(stock);

Reply via email to