This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 397ef33 fix: non-batched messages cause sql query to fail (#3684) 397ef33 is described below commit 397ef3322e8b082817ecb70351d4923e414daf9f Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Feb 25 18:21:32 2019 -0800 fix: non-batched messages cause sql query to fail (#3684) --- .../org/apache/pulsar/common/api/raw/MessageParser.java | 2 +- .../pulsar/tests/integration/presto/TestBasicPresto.java | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java index d894948..1f1d66e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java @@ -86,7 +86,7 @@ public class MessageParser { if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) { processor.process( - RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload, ledgerId, entryId, 0)); + RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload.retain(), ledgerId, entryId, 0)); } else { // handle batch message enqueuing; uncompressed payload has all messages in batch receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 505ada4..48cffb7 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -67,7 +67,16 @@ public class TestBasicPresto extends PulsarTestSuite { } @Test - public void testSimpleSQLQuery() throws Exception { + public void testSimpleSQLQueryBatched() throws Exception { + testSimpleSQLQuery(true); + } + + @Test + public void testSimpleSQLQueryNonBatched() throws Exception { + testSimpleSQLQuery(false); + } + + public void testSimpleSQLQuery(boolean isBatched) throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder() @@ -79,9 +88,9 @@ public class TestBasicPresto extends PulsarTestSuite { @Cleanup Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) .topic(stocksTopic) + .enableBatching(isBatched) .create(); - for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10); producer.send(stock);