haridsv commented on code in PR #1964:
URL: https://github.com/apache/phoenix/pull/1964#discussion_r1736333194


##########
phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java:
##########
@@ -1946,27 +1946,32 @@ public MutationState createCDC(CreateCDCStatement 
statement) throws SQLException
         Map<String, Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(
                 statement.getProps().size() + 1);
         populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.CDC);
-
-        PhoenixStatement pstmt = new PhoenixStatement(connection);
-        String dataTableFullName = 
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
-                statement.getDataTable().getTableName());
-        String createIndexSql = "CREATE UNCOVERED INDEX " +
-                (statement.isIfNotExists() ? "IF NOT EXISTS " : "") +
-                "\"" + 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()) + "\"" +
-                " ON " + dataTableFullName + " (" + 
PhoenixRowTimestampFunction.NAME + "()) ASYNC";
-        List<String> indexProps = new ArrayList<>();
-        Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
-        if (saltBucketNum != null) {
-            indexProps.add("SALT_BUCKETS=" + saltBucketNum);
-        }
-        Object columnEncodedBytes = 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
-        if (columnEncodedBytes != null) {
-            indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
-        }
-        createIndexSql = createIndexSql + " " + String.join(", ", indexProps);
-        try {
-            pstmt.execute(createIndexSql);
-        } catch (SQLException e) {
+        Properties props = connection.getClientInfo();
+        props.put(INDEX_CREATE_DEFAULT_STATE, "ACTIVE");
+        try (Connection internalConnection = QueryUtil.getConnection(props, 
connection.getQueryServices().getConfiguration())) {
+            PhoenixStatement pstmt = new PhoenixStatement((PhoenixConnection) 
internalConnection);

Review Comment:
   Nit suggestion: If we move this down to line 1973, then the scope of try 
block can be made much smaller. 



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java:
##########
@@ -701,12 +660,143 @@ public void testSelectCDCFailDataTableUpdate() throws 
Exception {
 
         long startTS = System.currentTimeMillis();
         generateChanges(startTS, tenantids, tableName, null,
-                COMMIT_FAILURE_EXPECTED);
+                COMMIT_FAILURE_EXPECTED, "v3");
 
         try (Connection conn = newConnection(tenantId)) {
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" +
                     SchemaUtil.getTableName(schemaName, cdcName));
             assertEquals(false, rs.next());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexBuildAndVerification() throws Exception {
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            // Create a table and add some rows
+            createTable(conn, "CREATE TABLE  " + tableFullName + " (" + 
(multitenant ?
+                    "TENANT_ID CHAR(5) NOT NULL, " :
+                    "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 
INTEGER, B.vb INTEGER, "
+                    + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+                    "(TENANT_ID, k) " :
+                    "(k)") + ")", encodingScheme, multitenant, 
tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = generateUniqueName();
+                String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+                createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT 
* FROM " + tableFullName,
+                        encodingScheme);
+                tableName = viewName;
+                tableFullName = viewFullName;
+            }
+
+            String tenantId = multitenant ? "1000" : null;
+            String[] tenantids = { tenantId };
+            if (multitenant) {
+                tenantids = new String[] { tenantId, "2000" };
+            }
+
+            long startTS = System.currentTimeMillis();
+            generateChanges(startTS, tenantids, tableFullName, tableFullName, 
COMMIT_SUCCESS, null);

Review Comment:
   How about using the newer `generateMutations` method along with a simpler 
table schema (number of and type of columns is not important right?)? Same 
applies to the other new test as well.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java:
##########
@@ -701,12 +660,143 @@ public void testSelectCDCFailDataTableUpdate() throws 
Exception {
 
         long startTS = System.currentTimeMillis();
         generateChanges(startTS, tenantids, tableName, null,
-                COMMIT_FAILURE_EXPECTED);
+                COMMIT_FAILURE_EXPECTED, "v3");
 
         try (Connection conn = newConnection(tenantId)) {
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" +
                     SchemaUtil.getTableName(schemaName, cdcName));
             assertEquals(false, rs.next());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexBuildAndVerification() throws Exception {
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;

Review Comment:
   Nit cleanup: This was needed before because of the `dataBeforeCDC` test 
param. Now that the param is removed, these can be made inline. This is also 
applicable for the existing tests.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java:
##########
@@ -294,6 +294,10 @@ private List<QueryPlan> 
getApplicablePlansForSingleFlatQuery(QueryPlan dataPlan,
         }
         
         for (PTable index : indexes) {
+            if (CDCUtil.isCDCIndex(index) && !forCDC) {
+                // A CDC index is allowed only for the queries on its CDC table

Review Comment:
   How about clarifying why? Something like this:
   ```suggestion
                   // A CDC index is allowed only for the queries on its CDC 
table because a CDC index may not be built completely for regular queries.
   ```



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java:
##########
@@ -701,12 +660,143 @@ public void testSelectCDCFailDataTableUpdate() throws 
Exception {
 
         long startTS = System.currentTimeMillis();
         generateChanges(startTS, tenantids, tableName, null,
-                COMMIT_FAILURE_EXPECTED);
+                COMMIT_FAILURE_EXPECTED, "v3");
 
         try (Connection conn = newConnection(tenantId)) {
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" +
                     SchemaUtil.getTableName(schemaName, cdcName));
             assertEquals(false, rs.next());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexBuildAndVerification() throws Exception {
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            // Create a table and add some rows
+            createTable(conn, "CREATE TABLE  " + tableFullName + " (" + 
(multitenant ?
+                    "TENANT_ID CHAR(5) NOT NULL, " :
+                    "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 
INTEGER, B.vb INTEGER, "
+                    + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+                    "(TENANT_ID, k) " :
+                    "(k)") + ")", encodingScheme, multitenant, 
tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = generateUniqueName();
+                String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+                createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT 
* FROM " + tableFullName,
+                        encodingScheme);
+                tableName = viewName;
+                tableFullName = viewFullName;
+            }
+
+            String tenantId = multitenant ? "1000" : null;
+            String[] tenantids = { tenantId };
+            if (multitenant) {
+                tenantids = new String[] { tenantId, "2000" };
+            }
+
+            long startTS = System.currentTimeMillis();
+            generateChanges(startTS, tenantids, tableFullName, tableFullName, 
COMMIT_SUCCESS, null);
+            EnvironmentEdgeManager.reset();
+            // Create a CDC table
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            // Check CDC index is active but empty
+            String indexTableFullName = SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            PTable indexTable = ((PhoenixConnection) 
conn).getTableNoCache(indexTableFullName);
+            assertEquals(indexTable.getIndexState(), PIndexState.ACTIVE);
+            TestUtil.assertRawRowCount(conn,
+                    
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+            // Rebuild the index and verify that it is still empty
+            IndexToolIT.runIndexTool(false, schemaName, tableName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            TestUtil.assertRawRowCount(conn,
+                    
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+            // Add more rows
+            startTS = System.currentTimeMillis();
+            generateChanges(startTS, tenantids, tableFullName, tableFullName, 
COMMIT_SUCCESS, null);
+            // Verify CDC index verification pass
+            IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, 
tableName,
+                    CDCUtil.getCDCIndexName(cdcName), null, 0, 
IndexTool.IndexVerifyType.ONLY);
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexTTLEqualsToMaxLookbackAge() throws Exception {
+        if (forView) {
+            // Except for views
+            return;
+        }
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            // Create a table
+            createTable(conn, "CREATE TABLE  " + tableFullName + " (" + 
(multitenant ?
+                    "TENANT_ID CHAR(5) NOT NULL, " :
+                    "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 
INTEGER, B.vb INTEGER, "
+                    + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+                    "(TENANT_ID, k) " :
+                    "(k)") + ")", encodingScheme, multitenant, 
tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = generateUniqueName();
+                String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+                createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT 
* FROM " + tableFullName,
+                        encodingScheme);
+                tableName = viewName;
+                tableFullName = viewFullName;
+            }
+
+            String tenantId = multitenant ? "1000" : null;
+            String[] tenantids = { tenantId };
+            if (multitenant) {
+                tenantids = new String[] { tenantId, "2000" };
+            }
+
+            // Create a CDC table
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            // Add rows
+            long startTS = System.currentTimeMillis();
+            List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null);
+            // Advance time by the max lookback age. This will cause all rows 
to expire

Review Comment:
   Have you considered adding another scenario that would only expire some of 
the rows?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to