Repository: phoenix Updated Branches: refs/heads/master 545abe535 -> 36a41c86a
PHOENIX-1103 ChunkedResultIterator compat with hash joins Remove the special case for hash joins in the ChunkedResultIterator. Once the chunk size is reached, continue scanning until the row key changes. This commit also adds a specific test in the HashJoinIT to test with a small chunk size. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36a41c86 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36a41c86 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36a41c86 Branch: refs/heads/master Commit: 36a41c86a4cdb16ab91b95882dbdb02e6006831d Parents: 545abe5 Author: Gabriel Reid <gr...@apache.org> Authored: Mon Jul 21 18:06:05 2014 +0200 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Wed Jul 23 15:45:36 2014 +0200 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/HashJoinIT.java | 78 +++++++++++++++++++- .../phoenix/iterate/ChunkedResultIterator.java | 39 ++++++---- .../org/apache/phoenix/join/HashJoinInfo.java | 11 --- .../phoenix/query/QueryServicesOptions.java | 5 +- 4 files changed, 105 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 99a3d9d..4ed85cb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -105,7 +105,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { } } - @Parameters(name="{0}") + @Parameters public static Collection<Object> data() { List<Object> testCases = Lists.newArrayList(); testCases.add(new String[][] { @@ -2142,7 +2142,83 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { String query = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC"; + + Properties props = new Properties(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + assertTrue (rs.next()); + assertNull(rs.getString(1)); + assertEquals(rs.getString(2), "T5"); + assertEquals(rs.getString(3), "S5"); + assertEquals(rs.getInt(4), 0); + assertNull(rs.getDate(5)); + assertTrue (rs.next()); + assertNull(rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals(rs.getString(3), "S4"); + assertEquals(rs.getInt(4), 0); + assertNull(rs.getDate(5)); + assertTrue (rs.next()); + assertNull(rs.getString(1)); + assertNull(rs.getString(2)); + assertEquals(rs.getString(3), "S3"); + assertEquals(rs.getInt(4), 0); + assertNull(rs.getDate(5)); + assertTrue (rs.next()); + assertNull(rs.getString(1)); + assertEquals(rs.getString(2), "T4"); + assertEquals(rs.getString(3), "S2"); + assertEquals(rs.getInt(4), 0); + assertNull(rs.getDate(5)); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000001"); + assertEquals(rs.getString(2), "T1"); + assertEquals(rs.getString(3), "S1"); + assertEquals(rs.getInt(4), 1000); + assertNotNull(rs.getDate(5)); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000002"); + assertEquals(rs.getString(2), "T6"); + assertEquals(rs.getString(3), "S6"); + assertEquals(rs.getInt(4), 2000); + assertNotNull(rs.getDate(5)); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000003"); + assertEquals(rs.getString(2), "T2"); + assertEquals(rs.getString(3), "S1"); + assertEquals(rs.getInt(4), 3000); + assertNotNull(rs.getDate(5)); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000004"); + assertEquals(rs.getString(2), "T6"); + assertEquals(rs.getString(3), "S6"); + assertEquals(rs.getInt(4), 4000); + assertNotNull(rs.getDate(5)); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000005"); + assertEquals(rs.getString(2), "T3"); + assertEquals(rs.getString(3), "S2"); + assertEquals(rs.getInt(4), 5000); + assertNotNull(rs.getDate(5)); + + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + // Basically a copy of testMultiRightJoin, but with a very small result scan chunk size + // to test that repeated row keys within a single chunk are handled properly + @Test + public void testMultiRightJoin_SmallChunkSize() throws Exception { + String query = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " + + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN " + + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC"; + Properties props = new Properties(TEST_PROPERTIES); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1"); Connection conn = DriverManager.getConnection(getUrl(), props); try { PreparedStatement statement = conn.prepareStatement(query); http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index 92080eb..cfaca84 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.exception.PhoenixIOException; -import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; @@ -60,19 +59,11 @@ public class ChunkedResultIterator implements PeekingResultIterator { @Override public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { - // TODO It doesn't seem right to do this selection here, but it's not currently clear - // where a better place is to do it - // For a HashJoin the scan can't be restarted where it left off, so we don't use - // a ChunkedResultIterator - if (HashJoinInfo.isHashJoin(context.getScan())) { - return delegateFactory.newIterator(context, scanner); - } else { - scanner.close(); //close the iterator since we don't need it anymore. - return new ChunkedResultIterator(delegateFactory, context, tableRef, - context.getConnection().getQueryServices().getProps().getLong( - QueryServices.SCAN_RESULT_CHUNK_SIZE, - QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); - } + scanner.close(); //close the iterator since we don't need it anymore. + return new ChunkedResultIterator(delegateFactory, context, tableRef, + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.SCAN_RESULT_CHUNK_SIZE, + QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); } } @@ -136,6 +127,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { private static class SingleChunkResultIterator implements ResultIterator { private int rowCount = 0; + private boolean chunkComplete; private boolean endOfStreamReached; private Tuple lastTuple; private final ResultIterator delegate; @@ -153,6 +145,14 @@ public class ChunkedResultIterator implements PeekingResultIterator { } Tuple next = delegate.next(); if (next != null) { + // We actually keep going past the chunk size until the row key changes. This is + // necessary for (at least) hash joins, as they can return multiple rows with the + // same row key. Stopping a chunk at a row key boundary is necessary in order to + // be able to start the next chunk on the next row key + if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) { + chunkComplete = true; + return null; + } lastTuple = next; rowCount++; } else { @@ -175,7 +175,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { * Returns true if the current chunk has been fully iterated over. */ public boolean isChunkComplete() { - return rowCount == chunkSize; + return chunkComplete; } /** @@ -193,5 +193,14 @@ public class ChunkedResultIterator implements PeekingResultIterator { lastTuple.getKey(keyPtr); return keyPtr.get(); } + + private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) { + ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable(); + ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable(); + lastTuple.getKey(oldKeyPtr); + newTuple.getKey(newKeyPtr); + + return oldKeyPtr.compareTo(newKeyPtr) != 0; + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java index ce336b8..d00e802 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java @@ -224,15 +224,4 @@ public class HashJoinInfo { } } } - - /** - * Check if a scan is intended for completing a HashJoin. - * - * @param scan the scan to be checked - * @return {@code true} if the scan is to be used for a HashJoin, otherwise {@code false} - */ - public static boolean isHashJoin(Scan scan) { - return scan.getAttribute(HASH_JOIN) != null; - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 1569a88..33ee94c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -101,7 +101,10 @@ public class QueryServicesOptions { public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5; public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000; - public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 1000L; + // We make the default chunk size one row smaller than the default scan cache size because + // one extra row is typically read and discarded by the ChunkedResultIterator, and we don't + // want to fill up a whole new cache to read a single extra record + public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = DEFAULT_SCAN_CACHE_SIZE - 1L; // // Spillable GroupBy - SPGBY prefix