This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ac10b00 Fix: predicate pushdown for Pulsar SQL NPE (#4744) ac10b00 is described below commit ac10b006cf59308ae1a0bf8307ddcb2f5745cc11 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Jul 18 11:59:33 2019 -0700 Fix: predicate pushdown for Pulsar SQL NPE (#4744) * Fix: predicate pushdown for Pulsar SQL NPE * fix unit test --- .../bookkeeper/mledger/impl/OpFindNewest.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 +- .../service/PersistentMessageFinderTest.java | 2 +- tests/integration/pom.xml | 7 ++ .../containers/PrestoWorkerContainer.java | 5 ++ .../tests/integration/presto/TestBasicPresto.java | 92 +++++++++++++++++++++- 6 files changed, 109 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 60ecacf..57e8044 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -64,7 +64,7 @@ class OpFindNewest implements ReadEntryCallback { switch (state) { case checkFirst: if (!condition.apply(entry)) { - callback.findEntryComplete(null, OpFindNewest.this.ctx); + callback.findEntryComplete(startPosition, OpFindNewest.this.ctx); return; } else { lastMatchedPosition = position; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 4592e32..fc3a8cb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1645,7 +1645,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ledger.addEntry("not-expired".getBytes(Encoding)); ledger.addEntry("not-expired".getBytes(Encoding)); - assertNull( + assertEquals(c1.readPosition, c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); } @@ -2108,7 +2108,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ManagedLedger ledger = factory.open(ledgerAndCursorName, config); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); - ledger.addEntry(getEntryPublishTime("retained1")); + Position firstPosition = ledger.addEntry(getEntryPublishTime("retained1")); // space apart message publish times Thread.sleep(100); ledger.addEntry(getEntryPublishTime("retained2")); @@ -2135,6 +2135,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { PositionImpl found = (PositionImpl) findPositionFromAllEntries(c1, timestamp); assertEquals(found.getLedgerId(), ledgerId); assertEquals(found.getEntryId(), expectedEntryId); + + found = (PositionImpl) findPositionFromAllEntries(c1, 0); + assertEquals(found, firstPosition); } @Test(timeOut = 20000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index b2c2246..377e0ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -154,7 +154,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { future = findMessage(result, c1, beginTimestamp); future.get(); assertEquals(result.exception, null); - assertEquals(result.position, null); + assertEquals(result.position, c1.getFirstPosition()); result.reset(); future = findMessage(result, c1, endTimestamp); diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 9db5bf2..486bea0 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -137,6 +137,13 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-jdbc</artifactId> + <version>${presto.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java index 2dd4d4e..71ebb48 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java @@ -37,6 +37,7 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer -1, PRESTO_HTTP_PORT, "/v1/node"); + } @Override @@ -50,4 +51,8 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer ); } } + + public String getUrl() { + return String.format("%s:%s", getContainerIpAddress(), getMappedPort(PrestoWorkerContainer.PRESTO_HTTP_PORT)); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index d759023..093f3eb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -32,6 +32,15 @@ import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.LinkedList; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; @Slf4j @@ -84,7 +93,12 @@ public class TestBasicPresto extends PulsarTestSuite { .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); - final String stocksTopic = "stocks"; + String stocksTopic; + if (isBatched) { + stocksTopic = "stocks_batched"; + } else { + stocksTopic = "stocks_nonbatched"; + } @Cleanup Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) @@ -96,6 +110,7 @@ public class TestBasicPresto extends PulsarTestSuite { final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10); producer.send(stock); } + producer.flush(); result = execQuery("show schemas in pulsar;"); assertThat(result.getExitCode()).isEqualTo(0); @@ -105,7 +120,7 @@ public class TestBasicPresto extends PulsarTestSuite { assertThat(result.getExitCode()).isEqualTo(0); assertThat(result.getStdout()).contains("stocks"); - ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;"); + ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic)); assertThat(containerExecResult.getExitCode()).isEqualTo(0); log.info("select sql query output \n{}", containerExecResult.getStdout()); String[] split = containerExecResult.getStdout().split("\n"); @@ -119,6 +134,67 @@ public class TestBasicPresto extends PulsarTestSuite { assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); } + // test predicate pushdown + + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + Connection connection = DriverManager.getConnection(url, "test", null); + + String query = String.format("select * from pulsar" + + ".\"public/default\".%s order by __publish_time__", stocksTopic); + log.info("Executing query: {}", query); + ResultSet res = connection.createStatement().executeQuery(query); + + List<Timestamp> timestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + timestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2); + + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2)); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + List<Timestamp> returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2); + + // Try with a predicate that has a earlier time than any entry + // Should return all rows + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size()); + + // Try with a predicate that has a latter time than any entry + // Should return no rows + + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(0); } @AfterSuite @@ -137,4 +213,16 @@ public class TestBasicPresto extends PulsarTestSuite { } + private static void printCurrent(ResultSet rs) throws SQLException { + ResultSetMetaData rsmd = rs.getMetaData(); + int columnsNumber = rsmd.getColumnCount(); + for (int i = 1; i <= columnsNumber; i++) { + if (i > 1) System.out.print(", "); + String columnValue = rs.getString(i); + System.out.print(columnValue + " " + rsmd.getColumnName(i)); + } + System.out.println(""); + + } + }