Repository: phoenix
Updated Branches:
  refs/heads/master 545abe535 -> 36a41c86a


PHOENIX-1103 ChunkedResultIterator compat with hash joins

Remove the special case for hash joins in the
ChunkedResultIterator. Once the chunk size is reached, continue
scanning until the row key changes. This commit also adds a
specific test in the HashJoinIT to test with a small chunk size.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36a41c86
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36a41c86
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36a41c86

Branch: refs/heads/master
Commit: 36a41c86a4cdb16ab91b95882dbdb02e6006831d
Parents: 545abe5
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jul 21 18:06:05 2014 +0200
Committer: Gabriel Reid <gabri...@ngdata.com>
Committed: Wed Jul 23 15:45:36 2014 +0200

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 78 +++++++++++++++++++-
 .../phoenix/iterate/ChunkedResultIterator.java  | 39 ++++++----
 .../org/apache/phoenix/join/HashJoinInfo.java   | 11 ---
 .../phoenix/query/QueryServicesOptions.java     |  5 +-
 4 files changed, 105 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 99a3d9d..4ed85cb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -105,7 +105,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         }
     }
     
-    @Parameters(name="{0}")
+    @Parameters
     public static Collection<Object> data() {
         List<Object> testCases = Lists.newArrayList();
         testCases.add(new String[][] {
@@ -2142,7 +2142,83 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         String query = "SELECT \"order_id\", i.name, s.name, quantity, date 
FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
             + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" 
RIGHT JOIN "
             + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = 
s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S3");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    // Basically a copy of testMultiRightJoin, but with a very small result 
scan chunk size
+    // to test that repeated row keys within a single chunk are handled 
properly
+    @Test
+    public void testMultiRightJoin_SmallChunkSize() throws Exception {
+        String query = "SELECT \"order_id\", i.name, s.name, quantity, date 
FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = 
i.\"item_id\" RIGHT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = 
s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
         Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
             PreparedStatement statement = conn.prepareStatement(query);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 92080eb..cfaca84 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.PhoenixIOException;
-import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
@@ -60,19 +59,11 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
 
         @Override
         public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
-            // TODO It doesn't seem right to do this selection here, but it's 
not currently clear
-            // where a better place is to do it
-            // For a HashJoin the scan can't be restarted where it left off, 
so we don't use
-            // a ChunkedResultIterator
-            if (HashJoinInfo.isHashJoin(context.getScan())) {
-                return delegateFactory.newIterator(context, scanner);
-            } else {
-               scanner.close(); //close the iterator since we don't need it 
anymore.
-                return new ChunkedResultIterator(delegateFactory, context, 
tableRef,
-                        
context.getConnection().getQueryServices().getProps().getLong(
-                                            
QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                            
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
-            }
+            scanner.close(); //close the iterator since we don't need it 
anymore.
+            return new ChunkedResultIterator(delegateFactory, context, 
tableRef,
+                    
context.getConnection().getQueryServices().getProps().getLong(
+                                        QueryServices.SCAN_RESULT_CHUNK_SIZE,
+                                        
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
         }
     }
 
@@ -136,6 +127,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     private static class SingleChunkResultIterator implements ResultIterator {
 
         private int rowCount = 0;
+        private boolean chunkComplete;
         private boolean endOfStreamReached;
         private Tuple lastTuple;
         private final ResultIterator delegate;
@@ -153,6 +145,14 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             }
             Tuple next = delegate.next();
             if (next != null) {
+                // We actually keep going past the chunk size until the row 
key changes. This is
+                // necessary for (at least) hash joins, as they can return 
multiple rows with the
+                // same row key. Stopping a chunk at a row key boundary is 
necessary in order to
+                // be able to start the next chunk on the next row key
+                if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) {
+                    chunkComplete = true;
+                    return null;
+                }
                 lastTuple = next;
                 rowCount++;
             } else {
@@ -175,7 +175,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
          * Returns true if the current chunk has been fully iterated over.
          */
         public boolean isChunkComplete() {
-            return rowCount == chunkSize;
+            return chunkComplete;
         }
 
         /**
@@ -193,5 +193,14 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             lastTuple.getKey(keyPtr);
             return keyPtr.get();
         }
+
+        private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) {
+            ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable();
+            ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable();
+            lastTuple.getKey(oldKeyPtr);
+            newTuple.getKey(newKeyPtr);
+
+            return oldKeyPtr.compareTo(newKeyPtr) != 0;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index ce336b8..d00e802 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -224,15 +224,4 @@ public class HashJoinInfo {
             }
         }
     }
-
-    /**
-     * Check if a scan is intended for completing a HashJoin.
-     *
-     * @param scan the scan to be checked
-     * @return {@code true} if the scan is to be used for a HashJoin, 
otherwise {@code false}
-     */
-    public static boolean isHashJoin(Scan scan) {
-        return scan.getAttribute(HASH_JOIN) != null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36a41c86/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1569a88..33ee94c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -101,7 +101,10 @@ public class QueryServicesOptions {
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 
1024 * 1; // 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
-    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 1000L;
+    // We make the default chunk size one row smaller than the default scan 
cache size because
+    // one extra row is typically read and discarded by the 
ChunkedResultIterator, and we don't
+    // want to fill up a whole new cache to read a single extra record
+    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 
DEFAULT_SCAN_CACHE_SIZE - 1L;
     
     // 
     // Spillable GroupBy - SPGBY prefix

Reply via email to