sanjeet006py commented on code in PR #2109:
URL: https://github.com/apache/phoenix/pull/2109#discussion_r2041496988


##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.salted;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+
+@Category(ParallelStatsEnabledIT.class)
+public class SaltedTableWithParallelStatsEnabledIT extends 
ParallelStatsEnabledIT {
+
+    private void testPhoenix7580(boolean withStatsForParallelization, boolean 
withFullTableScan,
+                                 boolean withPointLookups)
+            throws Exception {
+        String tableName = generateUniqueName();
+        int saltBucketCount = 5;
+        int rowsToInsert = saltBucketCount * 10;
+        String primaryKeyPrefix = "pk1_1";
+        // First 3 values are from salt bucket 0, next 3 are from salt bucket 
1 and so on
+        // till the last salt bucket. These values have been specifically 
selected so that
+        // we have values from all the 5 salt buckets.
+        int[] pk2ValuesForPointLookups = new int[]
+                { 4, 9, 13, 0, 5, 14, 1, 6, 10, 2, 7, 11, 3, 8, 12 };
+        int pointLookupsPerSaltBkt = pk2ValuesForPointLookups.length / 
saltBucketCount;
+
+        String connProfile = "testRangeScanForPhoenix7580" + 
withStatsForParallelization;
+        Properties props = new Properties();
+        props.setProperty(QueryServices.USE_STATS_FOR_PARALLELIZATION,
+                withStatsForParallelization ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString());
+        try (Connection conn = 
DriverManager.getConnection(getUrl(connProfile), props)) {
+            createTable(conn, tableName, saltBucketCount);
+            addRows(conn, tableName, primaryKeyPrefix, rowsToInsert);
+
+            // Run COUNT(*) range query on Phoenix with row key prefix as 
{@code primaryKeyPrefix}.
+            // Assert that count of rows reported by Phoenix is same as count 
of rows in HBase.
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+
+            // ********** Create conditions to trigger PHOENIX-7580 ***********
+
+            // Insert 3 rows with row key prefix greater than the row key 
prefix of rows
+            // earlier inserted.
+            String primaryKeyPrefixForNewRows = "pk1_2";
+            // These values have been carefully selected such that newly 
inserted rows go to
+            // second last salt bucket when salt bucket count = 5.
+            int[] pk2ValuesForNewRows = new int[] { 1, 6, 10 };
+            triggerPhoenix7580(conn, tableName, saltBucketCount, 
primaryKeyPrefixForNewRows,
+                    pk2ValuesForNewRows);
+
+            // **** Conditions to trigger PHOENIX-7580 have been satisfied. 
Test the fix now. ****
+
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn,
+                        rowsToInsert + pk2ValuesForNewRows.length, tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+        }
+    }
+
+    private void assertRangeScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                          String tableName, 
int saltBucketCount,
+                                                          String 
primaryKeyPrefix)
+            throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefix.length() + 1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKeyPrefix, 1,
+                rowKeyPrefix.length - 1);
+        for (int i = 0; i< saltBucketCount; i++) {
+            rowKeyPrefix[0] = (byte) i;
+            Scan scan = new Scan();
+            scan.setStartStopRowForPrefixScan(rowKeyPrefix);
+            try (ResultScanner scanner = hTable.getScanner(scan)) {
+                while(scanner.next() != null) {
+                    rowCountFromHBase++;
+                }
+            }
+        }
+        // Assert all the rows are visible on running prefix scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String rangeScanDql = "SELECT COUNT(*) FROM " + tableName + " WHERE 
PK1=?";
+        try (PreparedStatement stmt = conn.prepareStatement(rangeScanDql)) {
+            stmt.setString(1, primaryKeyPrefix);
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on running range query from 
Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertFullScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                         String tableName) 
throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        Scan scan = new Scan();
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            while(scanner.next() != null) {
+                rowCountFromHBase++;
+            }
+        }
+        // Assert all the rows are visible on full table scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String fullScanDql = "SELECT COUNT(*) FROM " + tableName;
+        try (PreparedStatement stmt = conn.prepareStatement(fullScanDql)) {
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on full table scan from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertPointLookupsRowCntFromHBaseAndPhoenix(Connection conn, 
int expectedRowCount,
+                                                             String tableName, 
int saltBucketCount,
+                                                             String 
firstPrimaryKey,
+                                                             int[] pk2Values, 
int rowsPerSaltBkt)
+            throws Exception {
+        String secondPrimaryKeyPrefix = "pk2_";
+        String primaryKeyPrefix = firstPrimaryKey + secondPrimaryKeyPrefix;
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKey = new byte[primaryKeyPrefix.length() + 3];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKey, 1,
+                rowKey.length - 3);
+        for (int i = 0; i < saltBucketCount; i++) {
+            for (int j = i * rowsPerSaltBkt; j < (i + 1) * rowsPerSaltBkt; 
j++) {
+                rowKey[0] = (byte) i;
+                byte[] rowKeySuffix = Bytes.toBytes(String.format("%02d", 
pk2Values[j]));
+                rowKey[rowKey.length - 2] = rowKeySuffix[0];
+                rowKey[rowKey.length - 1] = rowKeySuffix[1];
+                Get get = new Get(rowKey);
+                if (!hTable.get(get).isEmpty()) {
+                    rowCountFromHBase++;
+                }
+            }
+        }
+        // Assert all point lookups are visible from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        StringBuilder pointLookupDql = new StringBuilder("SELECT COUNT(*) FROM 
");
+        pointLookupDql.append(tableName);
+        pointLookupDql.append(" WHERE PK1=? AND PK2 IN (?");
+        for (int i = 1; i < pk2Values.length; i++) {
+            pointLookupDql.append(",?");
+        }
+        pointLookupDql.append(")");
+        try (PreparedStatement stmt = 
conn.prepareStatement(pointLookupDql.toString())) {
+            stmt.setString(1, firstPrimaryKey);
+            for (int i = 0; i < pk2Values.length; i++) {
+                stmt.setString(i + 2,
+                        String.format(secondPrimaryKeyPrefix + "%02d", i));
+            }
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all point lookups are visible from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void triggerPhoenix7580(Connection conn, String tableName, int 
saltBucketCount,
+                                    String primaryKeyPrefixForNewRows, int[] 
pk2ValuesForNewRows)
+            throws Exception {
+        String upsertDml = "UPSERT INTO " + tableName + " VALUES (?,?,?)";
+        try (PreparedStatement stmt = conn.prepareStatement(upsertDml)) {
+            for (int i: pk2ValuesForNewRows) {
+                stmt.setString(1, primaryKeyPrefixForNewRows);
+                stmt.setString(2, String.format("pk2_%02d", i));
+                stmt.setString(3, "col1_" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+        }
+
+        byte[] expectedEndKeyPrefixAfterSplit;
+        // Compute split key for splitting region corresponding to the second 
last salt bucket.
+        byte[] splitKey = null;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefixForNewRows.length() + 
1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefixForNewRows), 0,
+                rowKeyPrefix, 1, rowKeyPrefix.length - 1);
+        // Doing minus 2 from salt bucket count to get second last bucket.
+        // Salt buckets are 0 indexed.
+        rowKeyPrefix[0] = (byte) (saltBucketCount - 2);
+        expectedEndKeyPrefixAfterSplit = new byte[rowKeyPrefix.length];
+        // Save this and will be used to verify that conditions to trigger 
PHOENIX-7580 are
+        // being met at the end of this method call.
+        System.arraycopy(rowKeyPrefix, 0,
+                expectedEndKeyPrefixAfterSplit, 0, rowKeyPrefix.length);
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        Scan scan = new Scan();
+        scan.setStartStopRowForPrefixScan(rowKeyPrefix);
+        boolean pastFirstRow = false;
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            Result r;
+            while((r = scanner.next()) != null) {
+                if (pastFirstRow) {
+                    // Use row key of 2nd row out of 3 newly inserted rows as 
split key
+                    // later for splitting the region corresponding to the 
second last
+                    // salt bucket.
+                    splitKey = r.getRow();
+                    break;
+                }
+                pastFirstRow = true;
+            }
+        }
+
+        // Identify region corresponding to the second last salt bucket for 
splitting
+        Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        List<RegionInfo> regions = 
admin.getRegions(TableName.valueOf(tableName));
+        RegionInfo secondLastSaltBucketRegion = null;
+        for (RegionInfo regionInfo : regions) {
+            byte[] startKey = regionInfo.getStartKey();
+            byte[] endKey = regionInfo.getEndKey();
+            if (startKey.length > 0 && startKey[0] == saltBucketCount - 2
+                    && endKey.length > 0 && endKey[0] == saltBucketCount - 1) {
+                secondLastSaltBucketRegion = regionInfo;
+                break;
+            }
+        }
+        Assert.assertNotNull("Not able to determine region of second last salt 
bucket",
+                secondLastSaltBucketRegion);
+
+        // Split region corresponding to the second last salt bucket
+        
admin.splitRegionAsync(secondLastSaltBucketRegion.getEncodedNameAsBytes(),
+                splitKey).get();
+        regions = admin.getRegions(TableName.valueOf(tableName));
+        // Verify that after split the conditions to reproduce PHOENIX-7580 
are being met
+        for (RegionInfo regionInfo : regions) {
+            byte[] startKey = regionInfo.getStartKey();
+            byte[] endKey = regionInfo.getEndKey();
+            if (startKey.length > 0 && startKey[0] == saltBucketCount - 2) {
+                Assert.assertTrue(
+                        Bytes.compareTo(expectedEndKeyPrefixAfterSplit, 
endKey) < 0);
+                break;
+            }
+        }
+    }
+
+    private void createTable(Connection conn, String tableName, int 
saltBucketCount)
+            throws Exception {
+        String createTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(\n" +
+                "    PK1 CHAR(5) NOT NULL,\n" +
+                "    PK2 CHAR(6) NOT NULL,\n" +
+                "    COL1 VARCHAR,\n" +
+                "    CONSTRAINT PK PRIMARY KEY (\n" +
+                "        PK1,\n" +
+                "        PK2 \n" +
+                "    )\n" +
+                ") SALT_BUCKETS=" + saltBucketCount;
+        // Create table
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(createTableDdl);
+        }
+    }
+
+    private void addRows(Connection conn, String tableName, String 
primaryKeyPrefix,
+                         int rowsToInsert) throws Exception {
+        String upsertDml = "UPSERT INTO " + tableName + " VALUES (?,?,?)";
+        // Insert rows in the table with row key prefix at HBase level being
+        // {@code primaryKeyPrefix}.
+        try (PreparedStatement upsertStmt = conn.prepareStatement(upsertDml)) {
+            for (int i = 0; i < rowsToInsert; i++) {
+                upsertStmt.setString(1, primaryKeyPrefix);
+                upsertStmt.setString(2, String.format("pk2_%02d", i));
+                upsertStmt.setString(3, "col1_" + i);
+                upsertStmt.executeUpdate();
+            }
+            conn.commit();
+        }
+
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute("UPDATE STATISTICS " + tableName);
+        }
+    }
+
+    @Test
+    public void testRangeScanForPhoenix7580WithStatsForParallelization() 
throws Exception {
+        testPhoenix7580(true, false, false);
+    }
+
+    @Test
+    public void testRangeScanForPhoenix7580WithoutStatsForParallelization() 
throws Exception {
+        testPhoenix7580(false, false, false);
+    }
+
+    @Test
+    public void testFullTableScanWithStatsForParallelization() throws 
Exception {
+        testPhoenix7580(true, true, false);
+    }
+
+    @Test
+    public void testFullTableScanWithoutStatsForParallelization() throws 
Exception {
+        testPhoenix7580(false, true, false);
+    }
+
+    @Test
+    public void testPointLookupsWithStatsForParallelization() throws Exception 
{
+        testPhoenix7580(true, false, true);
+    }
+
+    @Test
+    public void testPointLookupsWithoutStatsForParallelization() throws 
Exception {
+        testPhoenix7580(false, false, true);
+    }

Review Comment:
   Made the test parameterised. Thanks for suggesting this as I didn't consider 
using JUnit parameterized tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to