[ 
https://issues.apache.org/jira/browse/PHOENIX-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478992#comment-17478992
 ] 

ASF GitHub Bot commented on PHOENIX-6622:
-----------------------------------------

gokceni commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r788141661



##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
##########
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    public TransformMonitorIT() throws IOException, InterruptedException {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, 
"ONE_CELL_PER_COLUMN");
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+        TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                        PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    private void testTransformTable(boolean createIndex, boolean createView, 
boolean isImmutable) throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String newTableName = dataTableName + "_1";
+        String newTableFullName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
+        String viewIdxName = "VW_IDX_" + generateUniqueName();
+        String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+        String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " 
(NAME) INCLUDE (ZIP) ";
+        String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 
VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+        String createViewIdxSql = "CREATE INDEX  %s ON " + viewName + " 
(VIEW_COL1) include (VIEW_COL2) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, 
numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+            if (createIndex) {
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName));
+            }
+
+            if (createView) {
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName));
+                conn.createStatement().execute("UPSERT INTO " + viewName + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+            }
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, 
null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, 
taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), 
record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, 
dataTableFullName);
+            assertEquals(newTableName, 
oldTable.getPhysicalName(true).getString());
+
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+            ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            
assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), 
newRowCount);
+
+            if (createIndex) {
+                assertEquals(newRowCount, countRows(conn, indexFullName));
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, dataTableFullName, 
(int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
indexFullName));
+                assertEquals(newRowCount, 
getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+                // Create another index on the new table and count
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createIndexStmt, 
indexName2));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
dataTableFullName));
+                assertEquals(newRowCount+additionalRows, countRows(conn, 
SchemaUtil.getTableName(schemaName, indexName2)));
+            } else if (createView) {
+                assertEquals(numOfRows, countRows(conn, viewName));
+                assertEquals(numOfRows, countRowsForViewIndex(conn, 
dataTableFullName));
+                assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+                
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is 
complete, we are using the new table
+                TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, 
additionalRows);
+                assertEquals(newRowCount+additionalRows, getRowCount(conn, 
viewName));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Drop view index and create another on the new table and 
count
+                conn.createStatement().execute("DROP INDEX " + viewIdxName + " 
ON " + viewName);
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createViewIdxSql, 
viewIdxName2));
+                assertEquals(newRowCount+additionalRows, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                // Create another view and have a new index on top
+                conn.createStatement().execute(String.format(createViewStmt, 
viewName2));
+                conn.createStatement().execute(String.format(createViewIdxSql, 
view2IdxName1));
+                assertEquals((newRowCount+additionalRows)*2, 
countRowsForViewIndex(conn, dataTableFullName));
+
+                conn.createStatement().execute("UPSERT INTO " + viewName2 + 
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 
'viewCol100')");
+                ResultSet rs = conn.createStatement().executeQuery("SELECT 
VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+                assertTrue(rs.next());
+                assertEquals("viewCol100", rs.getString(1));
+                assertEquals("uname100", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+
+    public static int countRows(Connection conn, String tableFullName) throws 
SQLException {
+        ResultSet count = conn.createStatement().executeQuery("select  /*+ 
NO_INDEX*/ count(*) from " + tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    protected int countRowsForViewIndex(Connection conn, String baseTable) 
throws IOException, SQLException {
+        String viewIndexTableName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable);
+        ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+        Table indexHTable = 
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+        return getUtility().countRows(indexHTable);
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithoutIndex() throws 
Exception {
+        testTransformTable(false, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithIndex() throws 
Exception {
+        testTransformTable(true, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_pausedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, 
PTable.TaskStatus.COMPLETED);

Review comment:
       Yes, task status will be completed. Another task will be created when 
transform resumes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TransformMonitor should orchestrate transform and do retries
> ------------------------------------------------------------
>
>                 Key: PHOENIX-6622
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6622
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Gokcen Iskender
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to