This is an automated email from the ASF dual-hosted git repository. gokcen pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new be289fa PHOENIX-6649 TransformTool to support views and tenant views be289fa is described below commit be289fa92b7ab434e1c5a7771f0980d993937ca6 Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Wed Feb 16 14:25:41 2022 -0800 PHOENIX-6649 TransformTool to support views and tenant views Signed-off-by: Gokcen Iskender <gokc...@gmail.com> --- .../transform/TransformMonitorExtendedIT.java | 148 ++++++++++++++-- .../end2end/transform/TransformMonitorIT.java | 30 +++- .../phoenix/end2end/transform/TransformToolIT.java | 192 ++++++++++++++++++++- .../phoenix/mapreduce/PhoenixInputFormat.java | 2 +- .../PhoenixTransformWithViewsInputFormat.java | 133 ++++++++++++++ .../phoenix/mapreduce/transform/TransformTool.java | 33 +++- .../org/apache/phoenix/schema/MetaDataClient.java | 7 + .../apache/phoenix/schema/transform/Transform.java | 65 +++++-- .../schema/transform/TransformMaintainer.java | 10 +- 9 files changed, 579 insertions(+), 41 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java index 0251f56..57932e7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end.transform; +import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -43,11 +44,13 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Map; import java.util.Properties; import static org.apache.phoenix.end2end.transform.TransformMonitorIT.waitForTransformToGetToState; +import static org.apache.phoenix.end2end.transform.TransformToolIT.getTenantConnection; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -65,12 +68,14 @@ public class TransformMonitorExtendedIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, - Integer.toString(60*60)); // An hour - serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); + Integer.toString(60 * 60)); // An hour + + serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); + clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString()); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); @@ -103,9 +108,9 @@ public class TransformMonitorExtendedIT extends BaseTest { public void testTransformIndexWithNamespaceEnabled() throws Exception { String schemaName = "S_" + generateUniqueName(); String dataTableName = "TBL_" + generateUniqueName(); - String fullDataTableName = SchemaUtil.getTableName(schemaName , dataTableName); + String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName); String indexName = "IDX_" + generateUniqueName(); - String fullIndexName = SchemaUtil.getTableName(schemaName , indexName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); String createIndexStmt = "CREATE INDEX %s ON " + fullDataTableName + " (NAME) INCLUDE (ZIP) "; try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) { conn.setAutoCommit(true); @@ -129,13 +134,14 @@ public class TransformMonitorExtendedIT extends BaseTest { ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + fullIndexName); assertTrue(rs.next()); assertEquals("1", rs.getString(1)); - assertEquals( 95051, rs.getInt(2)); + assertEquals(95051, rs.getInt(2)); assertTrue(rs.next()); assertEquals("2", rs.getString(1)); - assertEquals( 95052, rs.getInt(2)); + assertEquals(95052, rs.getInt(2)); assertFalse(rs.next()); } } + @Test public void testTransformTableWithNamespaceEnabled() throws Exception { String schemaName = "S_" + generateUniqueName(); @@ -156,16 +162,138 @@ public class TransformMonitorExtendedIT extends BaseTest { waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1); - assertEquals(numOfRows+1, TransformMonitorIT.countRows(conn, fullDataTableName)); + assertEquals(numOfRows + 1, TransformMonitorIT.countRows(conn, fullDataTableName)); ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP FROM " + fullDataTableName); assertTrue(rs.next()); assertEquals("1", rs.getString(1)); - assertEquals( 95051, rs.getInt(2)); + assertEquals(95051, rs.getInt(2)); assertTrue(rs.next()); assertEquals("2", rs.getString(1)); - assertEquals( 95052, rs.getInt(2)); + assertEquals(95052, rs.getInt(2)); assertFalse(rs.next()); } } + + @Test + public void testTransformWithGlobalAndTenantViews() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName1 = generateUniqueName(); + String dataTableFullName1 = SchemaUtil.getTableName(schemaName, dataTableName1); + String namespaceMappedDataTableName1 = SchemaUtil.getPhysicalHBaseTableName(schemaName, dataTableName1, true).getString(); + String view1Name = SchemaUtil.getTableName(schemaName, "VW1_" + generateUniqueName()); + String view2Name = SchemaUtil.getTableName(schemaName, "VW2_" + generateUniqueName()); + String tenantView = SchemaUtil.getTableName(schemaName, "VWT_" + generateUniqueName()); + String readOnlyTenantView = SchemaUtil.getTableName(schemaName, "ROVWT_" + generateUniqueName()); + + try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) { + conn.setAutoCommit(true); + int numOfRows = 1; + conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); + TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName1, numOfRows, "TABLE_ONLY", dataTableDdl); + + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName1); + + String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + + dataTableFullName1 + " where DATA='GLOBAL_VIEW' "; + conn.createStatement().execute(createViewSql); + PreparedStatement stmt1 = conn.prepareStatement(String.format("UPSERT INTO %s VALUES(?, ? , ?, ?, ?,?)", view1Name)); + stmt1.setInt(1, 2); + stmt1.setString(2, "uname2"); + stmt1.setInt(3, 95053); + stmt1.setString(4, "GLOBAL_VIEW"); + stmt1.setInt(5, 111); + stmt1.setString(6, "viewcol2"); + stmt1.executeUpdate(); + + createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + + dataTableFullName1 + " where DATA='GLOBAL_VIEW' AND ZIP=95053"; + conn.createStatement().execute(createViewSql); + stmt1 = conn.prepareStatement(String.format("UPSERT INTO %s VALUES(?, ? , ?, ?, ?,?)", view1Name)); + stmt1.setInt(1, 20); + stmt1.setString(2, "uname22"); + stmt1.setInt(3, 95053); + stmt1.setString(4, "GLOBAL_VIEW"); + stmt1.setInt(5, 111); + stmt1.setString(6, "viewcol22"); + stmt1.executeUpdate(); + } + + try (Connection tenantConn1 = getTenantConnection("tenant1")) { + tenantConn1.setAutoCommit(true); + String createViewSql = "CREATE VIEW " + tenantView + " ( VIEW_TCOL1 INTEGER, VIEW_TCOL2 VARCHAR ) " + + " AS SELECT * FROM " + + dataTableFullName1 + " where DATA='TENANT_VIEW'"; + tenantConn1.createStatement().execute(createViewSql); + + PreparedStatement stmt1 = tenantConn1.prepareStatement( + String.format("UPSERT INTO %s (ID, NAME, ZIP, DATA, VIEW_TCOL1, VIEW_TCOL2) " + + "VALUES(?, ? , ?, ?, ?, ?)", tenantView)); + stmt1.setInt(1, 4); + stmt1.setString(2, "uname4"); + stmt1.setInt(3, 95054); + stmt1.setString(4, "TENANT_VIEW"); + stmt1.setInt(5, 2001); + stmt1.setString(6, "tenantviewcol"); + stmt1.executeUpdate(); + + // ZIP field values are like 95050 + i + createViewSql = "CREATE VIEW " + readOnlyTenantView + " ( VIEW_TCOL1 INTEGER, VIEW_TCOL2 VARCHAR ) AS SELECT * FROM " + + dataTableFullName1 + " where DATA='TENANT_VIEW' AND ZIP > 95050"; + tenantConn1.createStatement().execute(createViewSql); + } + + try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) { + conn.setAutoCommit(true); + conn.createStatement().execute("ALTER TABLE " + dataTableFullName1 + + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName1, null, null, conn.unwrap(PhoenixConnection.class)); + assertNotNull(record); + waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); + assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + admin.disableTable(TableName.valueOf(namespaceMappedDataTableName1)); + admin.truncateTable(TableName.valueOf(namespaceMappedDataTableName1), true); + } + + SingleCellIndexIT.dumpTable(schemaName + ":" + dataTableName1 + "_1"); + + String sql = "SELECT VIEW_COL1, VIEW_COL2 FROM %s WHERE DATA='GLOBAL_VIEW' "; + ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, view1Name)); + assertTrue(rs1.next()); + assertEquals(111, rs1.getInt(1)); + assertEquals("viewcol2", rs1.getString(2)); + assertTrue(rs1.next()); + assertEquals("viewcol22", rs1.getString(2)); + assertFalse(rs1.next()); + + rs1 = conn.createStatement().executeQuery(String.format(sql, view2Name)); + assertTrue(rs1.next()); + assertEquals(111, rs1.getInt(1)); + assertEquals("viewcol2", rs1.getString(2)); + assertTrue(rs1.next()); + assertEquals("viewcol22", rs1.getString(2)); + assertFalse(rs1.next()); + + sql = "SELECT DATA FROM %s WHERE ID=1"; + rs1 = conn.createStatement().executeQuery(String.format(sql, dataTableFullName1)); + assertFalse(rs1.next()); + } + + try (Connection tenantConn1 = getTenantConnection("tenant1")) { + String sql = "SELECT VIEW_TCOL1, VIEW_TCOL2 FROM %s "; + ResultSet rs1 = tenantConn1.createStatement().executeQuery(String.format(sql, tenantView)); + + assertTrue(rs1.next()); + assertEquals(2001, rs1.getInt(1)); + assertEquals("tenantviewcol", rs1.getString(2)); + + ResultSet rs2 = tenantConn1.createStatement().executeQuery(String.format(sql, readOnlyTenantView)); + assertTrue(rs2.next()); + assertEquals(2001, rs2.getInt(1)); + assertEquals("tenantviewcol", rs2.getString(2)); + } + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java index 97c10e9..e5083d3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java @@ -169,6 +169,11 @@ public class TransformMonitorIT extends ParallelStatsDisabledIT { assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName); conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + ResultSet rs = conn.createStatement().executeQuery("SELECT VIEW_COL2 FROM " + viewName + " WHERE VIEW_COL1=100"); + assertTrue(rs.next()); + assertEquals("viewCol2", rs.getString(1)); + assertFalse(rs.next()); + int additionalRows = 2; // Upsert new rows to new table. Note that after transform is complete, we are using the new table TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, additionalRows); @@ -190,7 +195,7 @@ public class TransformMonitorIT extends ParallelStatsDisabledIT { assertEquals((newRowCount+additionalRows)*2, countRowsForViewIndex(conn, dataTableFullName)); conn.createStatement().execute("UPSERT INTO " + viewName2 + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 'viewCol100')"); - ResultSet rs = conn.createStatement().executeQuery("SELECT VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000"); + rs = conn.createStatement().executeQuery("SELECT VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000"); assertTrue(rs.next()); assertEquals("viewCol100", rs.getString(1)); assertEquals("uname100", rs.getString(2)); @@ -536,20 +541,23 @@ public class TransformMonitorIT extends ParallelStatsDisabledIT { try (Connection conn1 = DriverManager.getConnection(getUrl(), testProps)) { conn1.setAutoCommit(true); int numOfRows = 1; - TransformToolIT.createTableAndUpsertRows(conn1, dataTableName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : ""); + TransformToolIT.createTableAndUpsertRows(conn1, dataTableName, numOfRows, isImmutable ? " IMMUTABLE_ROWS=true" : ""); - conn1.createStatement().execute("ALTER TABLE " + dataTableName + - " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); - SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn1.unwrap(PhoenixConnection.class)); - assertNotNull(record); - waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); - - // A connection does transform and another connection doesn't try to upsert into old table String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; try (Connection conn2 = DriverManager.getConnection(url2, PropertiesUtil.deepCopy(TEST_PROPERTIES))) { conn2.setAutoCommit(true); TransformToolIT.upsertRows(conn2, dataTableName, 2, 1); + conn1.createStatement().execute("ALTER TABLE " + dataTableName + + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn1.unwrap(PhoenixConnection.class)); + assertNotNull(record); + waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); + assertMetadata(conn1, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + // A connection does transform and another connection doesn't try to upsert into old table + TransformToolIT.upsertRows(conn2, dataTableName, 3, 1); + ResultSet rs = conn2.createStatement().executeQuery("SELECT ID, NAME, ZIP FROM " + dataTableName); assertTrue(rs.next()); assertEquals("1", rs.getString(1)); @@ -559,6 +567,10 @@ public class TransformMonitorIT extends ParallelStatsDisabledIT { assertEquals("2", rs.getString(1)); assertEquals("uname2", rs.getString(2)); assertEquals( 95052, rs.getInt(3)); + assertTrue(rs.next()); + assertEquals("3", rs.getString(1)); + assertEquals("uname3", rs.getString(2)); + assertEquals( 95053, rs.getInt(3)); assertFalse(rs.next()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java index 9ece889..6a1cc3e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end.transform; +import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; @@ -73,7 +74,6 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; -import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES; import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES; import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -137,22 +137,33 @@ public class TransformToolIT extends ParallelStatsDisabledIT { } public static void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException { + createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "", tableOptions); + } + + public static void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String constantVal, String tableOptions) throws SQLException { String stmString1 = "CREATE TABLE IF NOT EXISTS " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, DATA VARCHAR) " + tableOptions; conn.createStatement().execute(stmString1); - upsertRows(conn, dataTableFullName, 1, numOfRows); + upsertRows(conn, dataTableFullName, 1, numOfRows, constantVal); conn.commit(); } public static void upsertRows(Connection conn, String dataTableFullName, int startIdx, int numOfRows) throws SQLException { - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + upsertRows(conn, dataTableFullName, startIdx, numOfRows, ""); + } + public static void upsertRows(Connection conn, String dataTableFullName, int startIdx, int numOfRows, String constantVal) throws SQLException { + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?, ?)", dataTableFullName); + PreparedStatement stmt = conn.prepareStatement(upsertQuery); // insert rows for (int i = startIdx; i < startIdx+numOfRows; i++) { - IndexToolIT.upsertRow(stmt1, i); + stmt.setInt(1, i); + stmt.setString(2, "uname" + String.valueOf(i)); + stmt.setInt(3, 95050 + i); + stmt.setString(4, constantVal); + stmt.executeUpdate(); } } @Test @@ -948,6 +959,175 @@ public class TransformToolIT extends ParallelStatsDisabledIT { } } + @Test + public void testTransformForGlobalViews() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String view1Name = "VW1_" + generateUniqueName(); + String view2Name = "VW2_" + generateUniqueName(); + String upsertQuery = "UPSERT INTO %s VALUES(?, ?, ?, ?, ?, ?)"; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + int numOfRows = 0; + createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions); + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName); + + String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL11 INTEGER, VIEW_COL12 VARCHAR ) AS SELECT * FROM " + + dataTableFullName + " where ID=1"; + conn.createStatement().execute(createViewSql); + + createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL21 INTEGER, VIEW_COL22 VARCHAR ) AS SELECT * FROM " + + dataTableFullName + " where ID=11"; + conn.createStatement().execute(createViewSql); + + PreparedStatement stmt1 = conn.prepareStatement(String.format(upsertQuery, view1Name)); + stmt1.setInt(1, 1); + stmt1.setString(2, "uname1"); + stmt1.setInt(3, 95051); + stmt1.setString(4, ""); + stmt1.setInt(5, 101); + stmt1.setString(6, "viewCol12"); + stmt1.executeUpdate(); + conn.commit(); + + stmt1 = conn.prepareStatement(String.format(upsertQuery, view2Name)); + stmt1.setInt(1, 11); + stmt1.setString(2, "uname11"); + stmt1.setInt(3, 950511); + stmt1.setString(4, ""); + stmt1.setInt(5, 111); + stmt1.setString(6, "viewCol22"); + stmt1.executeUpdate(); + conn.commit(); + + conn.createStatement().execute("ALTER TABLE " + dataTableFullName + + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class)); + assertNotNull(record); + assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + List<String> args = getArgList(schemaName, dataTableName, null, + null, null, null, false, false, false, false, false); + runTransformTool(args.toArray(new String[0]), 0); + Transform.doCutover(conn.unwrap(PhoenixConnection.class), record); + Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); + try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + admin.disableTable(TableName.valueOf(dataTableFullName)); + admin.truncateTable(TableName.valueOf(dataTableFullName), true); + } + + String sql = "SELECT VIEW_COL11, VIEW_COL12 FROM %s "; + ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, view1Name)); + assertTrue(rs1.next()); + assertEquals(101, rs1.getInt(1)); + assertEquals("viewCol12", rs1.getString(2)); + + sql = "SELECT VIEW_COL21, VIEW_COL22 FROM %s "; + rs1 = conn.createStatement().executeQuery(String.format(sql, view2Name)); + assertTrue(rs1.next()); + assertEquals(111, rs1.getInt(1)); + assertEquals("viewCol22", rs1.getString(2)); + } + } + + @Test + public void testTransformForTenantViews() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String view1Name = "VW1_" + generateUniqueName(); + String view2Name = "VW2_" + generateUniqueName(); + String upsertQuery = "UPSERT INTO %s VALUES(?, ?, ?, ?, ?, ?)"; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + int numOfRows = 0; + createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions); + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName); + } + + try (Connection tenantConn1 = getTenantConnection("tenant1")) { + String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL11 INTEGER, VIEW_COL12 VARCHAR ) AS SELECT * FROM " + + dataTableFullName + " where ID=1"; + tenantConn1.createStatement().execute(createViewSql); + } + + try (Connection tenantConn2 = getTenantConnection("tenant2")) { + String createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL21 INTEGER, VIEW_COL22 VARCHAR ) AS SELECT * FROM " + + dataTableFullName + " where ID=11"; + tenantConn2.createStatement().execute(createViewSql); + } + + try (Connection tenantConn1 = getTenantConnection("tenant1")) { + PreparedStatement stmt1 = tenantConn1.prepareStatement(String.format(upsertQuery, view1Name)); + stmt1.setInt(1, 1); + stmt1.setString(2, "uname1"); + stmt1.setInt(3, 95051); + stmt1.setString(4, ""); + stmt1.setInt(5, 101); + stmt1.setString(6, "viewCol12"); + stmt1.executeUpdate(); + tenantConn1.commit(); + } + + try (Connection tenantConn2 = getTenantConnection("tenant2")) { + PreparedStatement stmt1 = tenantConn2.prepareStatement(String.format(upsertQuery, view2Name)); + stmt1.setInt(1, 11); + stmt1.setString(2, "uname11"); + stmt1.setInt(3, 950511); + stmt1.setString(4, ""); + stmt1.setInt(5, 111); + stmt1.setString(6, "viewCol22"); + stmt1.executeUpdate(); + tenantConn2.commit(); + } + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("ALTER TABLE " + dataTableFullName + + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class)); + assertNotNull(record); + assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + List<String> args = getArgList(schemaName, dataTableName, null, + null, null, null, false, false, false, false, false); + runTransformTool(args.toArray(new String[0]), 0); + Transform.doCutover(conn.unwrap(PhoenixConnection.class), record); + Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); + try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + admin.disableTable(TableName.valueOf(dataTableFullName)); + admin.truncateTable(TableName.valueOf(dataTableFullName), true); + } + } + + try (Connection tenantConn1 = getTenantConnection("tenant1")) { + String sql = "SELECT VIEW_COL11, VIEW_COL12 FROM %s "; + ResultSet rs1 = tenantConn1.createStatement().executeQuery(String.format(sql, view1Name)); + assertTrue(rs1.next()); + assertEquals(101, rs1.getInt(1)); + assertEquals("viewCol12", rs1.getString(2)); + } + + try (Connection tenantConn2 = getTenantConnection("tenant2")) { + String sql = "SELECT VIEW_COL21, VIEW_COL22 FROM %s "; + ResultSet rs1 = tenantConn2.createStatement().executeQuery(String.format(sql, view2Name)); + assertTrue(rs1.next()); + assertEquals(111, rs1.getInt(1)); + assertEquals("viewCol22", rs1.getString(2)); + } + } + + + public static Connection getTenantConnection(String tenant) throws SQLException { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenant); + return DriverManager.getConnection(getUrl(), props); + } + public static void assertTransformStatusOrPartial(PTable.TransformStatus expectedStatus, SystemTransformRecord systemTransformRecord) { if (systemTransformRecord.getTransformStatus().equals(expectedStatus.name())) { return; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 68e3325..b21fd54 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -89,7 +89,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr return generateSplits(queryPlan, configuration); } - private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException { + protected List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException { // We must call this in order to initialize the scans and splits from the query plan setupParallelScansFromQueryPlan(qplan); final List<KeyRange> splits = qplan.getSplits(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java new file mode 100644 index 0000000..c968013 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.transform; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.ServerBuildTransformingTableCompiler; +import org.apache.phoenix.coprocessor.TableInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.transform.Transform; +import org.apache.phoenix.thirdparty.com.google.common.base.Strings; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.ViewUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; + +public class PhoenixTransformWithViewsInputFormat<T extends DBWritable> extends PhoenixServerBuildIndexInputFormat { + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixTransformWithViewsInputFormat.class); + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + final Configuration configuration = context.getConfiguration(); + try (PhoenixConnection connection = (PhoenixConnection) + ConnectionUtil.getInputConnection(configuration)) { + try (Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices().getTable( + SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES, configuration).toBytes())) { + String oldDataTableFullName = PhoenixConfigurationUtil.getIndexToolDataTableName(configuration); + String newDataTableFullName = getIndexToolIndexTableName(configuration); + PTable newDataTable = PhoenixRuntime.getTableNoCache(connection, newDataTableFullName); + String schemaName = SchemaUtil.getSchemaNameFromFullName(oldDataTableFullName); + String tableName = SchemaUtil.getTableNameFromFullName(oldDataTableFullName); + byte[] schemaNameBytes = Strings.isNullOrEmpty(schemaName) ? null : schemaName.getBytes(); + Pair<List<PTable>, List<TableInfo>> allDescendantViews = ViewUtil.findAllDescendantViews(hTable, configuration, null, schemaNameBytes, + tableName.getBytes(), EnvironmentEdgeManager.currentTimeMillis(), false); + List<PTable> legitimateDecendants = allDescendantViews.getFirst(); + + List<InputSplit> inputSplits = new ArrayList<>(); + + HashMap<String, PColumn> columnMap = new HashMap<>(); + for (PColumn column : newDataTable.getColumns()) { + columnMap.put(column.getName().getString(), column); + } + + for (PTable decendant : legitimateDecendants) { + if (decendant.getViewType() == PTable.ViewType.READ_ONLY) { + continue; + } + PTable newView = Transform.getTransformedView(decendant, newDataTable, columnMap, true); + QueryPlan queryPlan = getQueryPlan(newView, decendant, connection); + inputSplits.addAll(generateSplits(queryPlan, configuration)); + } + if (inputSplits.size() == 0) { + // Get for base table + ServerBuildTransformingTableCompiler compiler = new ServerBuildTransformingTableCompiler(connection, + oldDataTableFullName); + MutationPlan plan = compiler.compile(newDataTable); + inputSplits.addAll(generateSplits(plan.getQueryPlan(), configuration)); + } + return inputSplits; + } + } catch (Exception e) { + LOGGER.error("PhoenixTransformWithViewsInputFormat failed with: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + private QueryPlan getQueryPlan(PTable newTable, PTable oldTable, PhoenixConnection phoenixConnection) throws SQLException { + String tableTenantId = oldTable.getTenantId() == null? null:oldTable.getTenantId().getString(); + String connTenantId = phoenixConnection.getTenantId()==null? null:phoenixConnection.getTenantId().getString(); + if (!Strings.isNullOrEmpty(tableTenantId) && !StringUtils.equals(tableTenantId, connTenantId)) { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tableTenantId); + + try (PhoenixConnection tenantConnection = (PhoenixConnection) + DriverManager.getConnection(phoenixConnection.getURL(), props)) { + return getQueryPlanInternal(newTable, oldTable, tenantConnection); + } + } + return getQueryPlanInternal(newTable, oldTable, phoenixConnection); + } + + private QueryPlan getQueryPlanInternal(PTable newTable, PTable decendant, PhoenixConnection phoenixConnection) throws SQLException { + ServerBuildTransformingTableCompiler compiler = new ServerBuildTransformingTableCompiler(phoenixConnection, + SchemaUtil.getTableName(decendant.getSchemaName(), decendant.getTableName()).getString()); + + MutationPlan plan = compiler.compile(newTable); + return plan.getQueryPlan(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java index a5fe085..9cba861 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce.transform; +import org.apache.phoenix.mapreduce.PhoenixTTLTool; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.base.Strings; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; @@ -83,6 +84,7 @@ import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TransactionUtil; +import org.apache.phoenix.util.ViewUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +99,7 @@ import java.util.List; import java.util.UUID; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES; import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables; import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet; import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange; @@ -171,6 +174,9 @@ public class TransformTool extends Configured implements Tool { private static final Option END_TIME_OPTION = new Option("et", "end-time", true, "End time for transform"); + private static final Option SPLIT_SIZE_OPTION = new Option("ms", "split-size-per-mapper", true, + "Define split size for each mapper."); + public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s.%s"; public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts " @@ -219,6 +225,7 @@ public class TransformTool extends Configured implements Tool { private boolean shouldFixUnverified; private boolean shouldUseNewTableAsSource; private boolean shouldForceCutover; + private int splitSize; private Job job; public Long getStartTime() { @@ -265,6 +272,7 @@ public class TransformTool extends Configured implements Tool { options.addOption(PARTIAL_TRANSFORM_OPTION); options.addOption(START_TIME_OPTION); options.addOption(END_TIME_OPTION); + options.addOption(SPLIT_SIZE_OPTION); options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION); options.addOption(FORCE_CUTOVER_OPTION); options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION); @@ -350,6 +358,11 @@ public class TransformTool extends Configured implements Tool { indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) { + splitSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt())); + } else { + splitSize = PhoenixTTLTool.DEFAULT_MAPPER_SPLIT_SIZE; + } logicalTableName = dataTable; logicalParentName = null; if (!Strings.isNullOrEmpty(indexTable)) { @@ -574,12 +587,28 @@ public class TransformTool extends Configured implements Tool { fs = outputPath.getFileSystem(configuration); fs.delete(outputPath, true); } + PhoenixConfigurationUtil.setMultiInputMapperSplitSize(configuration, splitSize); + this.job = Job.getInstance(getConf(), jobName); job.setJarByClass(TransformTool.class); job.setPriority(this.jobPriority); - PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class, - oldTableWithSchema, ""); + boolean hasChildViews = false; + try (Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices().getTable( + SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES, configuration).toBytes())) { + byte[] tenantIdBytes = Strings.isNullOrEmpty(tenantId) ? null : tenantId.getBytes(); + byte[] schemaNameBytes = Strings.isNullOrEmpty(schemaName) ? null : schemaName.getBytes(); + hasChildViews = ViewUtil.hasChildViews(hTable, tenantIdBytes, schemaNameBytes, + pOldTable.getTableName().getBytes(), HConstants.LATEST_TIMESTAMP); + } + + if (hasChildViews && Strings.isNullOrEmpty(tenantId)) { + PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixTransformWithViewsInputFormat.class, + oldTableWithSchema, ""); + } else { + PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class, + oldTableWithSchema, ""); + } if (outputPath != null) { FileOutputFormat.setOutputPath(job, outputPath); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 4f3e729..ab0ae51 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -896,6 +896,13 @@ public class MetaDataClient { MetaDataMutationResult parentResult = updateCache(connection.getTenantId(), parentSchemaName, tableName, false, resolvedTimestamp); PTable parentTable = parentResult.getTable(); + if (parentResult.getMutationCode() == MutationCode.TABLE_NOT_FOUND || parentTable == null) { + // Try once more with different tenant id (connection can be global but view could be tenant + parentResult = + updateCache(table.getTenantId(), parentSchemaName, tableName, false, + resolvedTimestamp); + parentTable = parentResult.getTable(); + } if (LOGGER.isTraceEnabled()) { LOGGER.trace("addColumnsAndIndexesFromAncestors parent logical name " + table.getBaseTableLogicalName().getString() + " parent name " + table.getParentName().getString() + " tableName=" + table.getName()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java index 1d81911..6cd7247 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java @@ -61,6 +61,7 @@ import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -463,11 +464,16 @@ public class Transform { // We need to update the columns's qualifiers as well mutateColumns(connection.unwrap(PhoenixConnection.class), pOldTable, pNewTable); + HashMap<String, PColumn> columnMap = new HashMap<>(); + for (PColumn column : pNewTable.getColumns()) { + columnMap.put(column.getName().getString(), column); + } + // Also update view column qualifiers for (TableInfo view : childViewsResult.getLinks()) { PTable pView = PhoenixRuntime.getTable(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId()) , SchemaUtil.getTableName(view.getSchemaName(), view.getTableName())); - mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable); + mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable, columnMap); } } connection.commit(); @@ -612,13 +618,16 @@ public class Transform { } } - private static void mutateViewColumns(PhoenixConnection connection, PTable pView, PTable pNewTable) throws SQLException { - if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) { + public static PTable getTransformedView(PTable pOldView, PTable pNewTable, HashMap<String, PColumn> columnMap, boolean withDerivedColumns) throws SQLException { + List<PColumn> newColumns = new ArrayList<>(); + PTable pNewView = null; + if (pOldView.getEncodingScheme() != pNewTable.getEncodingScheme()) { Short nextKeySeq = 0; PTable.EncodedCQCounter cqCounterToUse = pNewTable.getEncodedCQCounter(); String defaultColumnFamily = pNewTable.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(pNewTable.getDefaultFamilyName().getString()) ? pNewTable.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY; - for (PColumn column : pView.getColumns()) { + + for (PColumn column : pOldView.getColumns()) { boolean isPk = SchemaUtil.isPKColumn(column); Short keySeq = isPk ? ++nextKeySeq : null; if (isPk) { @@ -630,15 +639,18 @@ public class Transform { } else { familyName = defaultColumnFamily; } - int encodedCQ = pView.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) : cqCounterToUse.getNextQualifier(familyName); - if (!pView.isAppendOnlySchema()) { - cqCounterToUse.increment(familyName); - } - + int encodedCQ = pOldView.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) : cqCounterToUse.getNextQualifier(familyName); byte[] colQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(column.getName().getString(), encodedCQ, pNewTable, isPk); + if (columnMap.containsKey(column.getName().getString())) { + colQualifierBytes = columnMap.get(column.getName().getString()).getColumnQualifierBytes(); + } else { + if (!column.isDerived()) { + cqCounterToUse.increment(familyName); + } + } - if (column.isDerived()) { + if (!withDerivedColumns && column.isDerived()) { // Don't need to add/change derived columns continue; } @@ -648,8 +660,37 @@ public class Transform { , column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), colQualifierBytes, EnvironmentEdgeManager.currentTimeMillis()); - String tenantId = pView.getTenantId() == null? null:pView.getTenantId().getString(); - addColumnMutation(connection, tenantId, pView.getSchemaName()==null?null:pView.getSchemaName().getString() + newColumns.add(newCol); + if (!columnMap.containsKey(newCol.getName().getString())) { + columnMap.put(newCol.getName().getString(), newCol) ; + } + } + + pNewView = PTableImpl.builderWithColumns(pOldView, newColumns) + .setQualifierEncodingScheme(pNewTable.getEncodingScheme()) + .setImmutableStorageScheme(pNewTable.getImmutableStorageScheme()) + .setPhysicalNames( + Collections.singletonList(SchemaUtil.getPhysicalHBaseTableName( + pNewTable.getSchemaName(), pNewTable.getTableName(), pNewTable.isNamespaceMapped()))) + .build(); + } else { + // Have to change this per transform type + } + return pNewView; + } + + private static void mutateViewColumns(PhoenixConnection connection, PTable pView, PTable pNewTable, HashMap<String, PColumn> columnMap) throws SQLException { + if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) { + Short nextKeySeq = 0; + PTable newView = getTransformedView(pView, pNewTable, columnMap,false); + for (PColumn newCol : newView.getColumns()) { + boolean isPk = SchemaUtil.isPKColumn(newCol); + Short keySeq = isPk ? ++nextKeySeq : null; + if (isPk) { + continue; + } + String tenantId = pView.getTenantId() == null ? null : pView.getTenantId().getString(); + addColumnMutation(connection, tenantId, pView.getSchemaName() == null ? null : pView.getSchemaName().getString() , pView.getTableName().getString(), newCol, pView.getParentTableName() == null ? null : pView.getParentTableName().getString() , pView.getPKName() == null ? null : pView.getPKName().getString(), keySeq, pView.getBucketNum() != null); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java index 8d08b10..c071291 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java @@ -90,6 +90,8 @@ public class TransformMaintainer extends IndexMaintainer { private int nOldTableCFs; private boolean newTableWALDisabled; private boolean newTableImmutableRows; + private Set<ColumnReference> allColumns; + // Transient state private final boolean isOldTableSalted; private final RowKeySchema oldTableRowKeySchema; @@ -133,9 +135,13 @@ public class TransformMaintainer extends IndexMaintainer { } public Set<ColumnReference> getAllColumns() { - return new HashSet<>(); + return allColumns; } + public Set<ColumnReference> getCoveredColumns() { + return coveredColumnsMap.keySet(); + } + private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) { this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null); this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable(); @@ -250,6 +256,8 @@ public class TransformMaintainer extends IndexMaintainer { * Init calculated state reading/creating */ private void initCachedState() { + this.allColumns = Sets.newLinkedHashSetWithExpectedSize(newTableExpressions.size() + coveredColumnsMap.size()); + byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst(); byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst(); newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);