This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch PHOENIX-7001-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push: new 7420443d84 PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766) 7420443d84 is described below commit 7420443d84ce6786e2594fd18f5f02c7b742197f Author: Hari Krishna Dara <harid...@gmail.com> AuthorDate: Sat Dec 23 21:59:19 2023 +0530 PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766) --- phoenix-core/pom.xml | 5 + .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 148 +++++++++++++- .../index/UncoveredGlobalIndexRegionScannerIT.java | 8 +- .../apache/phoenix/compile/ProjectionCompiler.java | 2 +- .../org/apache/phoenix/compile/QueryCompiler.java | 12 ++ .../apache/phoenix/compile/StatementContext.java | 10 +- .../phoenix/compile/TupleProjectionCompiler.java | 18 +- .../coprocessor/BaseScannerRegionObserver.java | 7 +- .../coprocessor/CDCGlobalIndexRegionScanner.java | 226 +++++++++++++++++++++ .../phoenix/coprocessor/MetaDataEndpointImpl.java | 5 +- .../coprocessor/UncoveredIndexRegionScanner.java | 29 ++- .../org/apache/phoenix/execute/BaseQueryPlan.java | 30 ++- .../phoenix/iterate/RegionScannerFactory.java | 50 +++-- .../phoenix/iterate/TableResultIterator.java | 2 +- .../apache/phoenix/optimize/QueryOptimizer.java | 18 +- .../java/org/apache/phoenix/parse/HintNode.java | 177 ++++++++-------- .../java/org/apache/phoenix/schema/ColumnRef.java | 3 +- .../org/apache/phoenix/schema/MetaDataClient.java | 11 +- .../java/org/apache/phoenix/schema/PTableImpl.java | 4 +- .../main/java/org/apache/phoenix/util/CDCUtil.java | 30 +++ .../apache/phoenix/util/EncodedColumnsUtil.java | 8 +- .../java/org/apache/phoenix/util/ScanUtil.java | 137 ++++++++++++- phoenix-pherf/pom.xml | 2 - pom.xml | 6 + 24 files changed, 783 insertions(+), 165 deletions(-) diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 7827a065f0..b1534225ca 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -249,6 +249,11 @@ </build> <dependencies> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <!-- shaded thirdparty dependencies --> <dependency> <groupId>org.apache.phoenix.thirdparty</groupId> diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java index 7e081419e2..d1f04c02cc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java @@ -17,7 +17,10 @@ */ package org.apache.phoenix.end2end; +import com.google.gson.Gson; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableProperty; @@ -27,13 +30,16 @@ import org.apache.phoenix.util.SchemaUtil; import org.junit.Test; import org.junit.experimental.categories.Category; +import javax.xml.transform.Result; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -60,7 +66,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { } } - private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes) + private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes, + String datatableName) throws SQLException { Properties props = new Properties(); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -68,6 +75,9 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { assertEquals(expIncludeScopes, table.getCDCIncludeScopes()); assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table)); assertNull(table.getIndexState()); // Index state should be null for CDC. + assertNull(table.getIndexType()); // This is not an index. + assertEquals(datatableName, table.getParentName().getString()); + assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString()); } private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException { @@ -117,7 +127,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { try { conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +"(ROUND(v1))"); - fail("Expected to fail due to non-timestamp expression in the index PK"); + fail("Expected to fail due to non-date expression in the index PK"); } catch (SQLException e) { assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), e.getErrorCode()); @@ -126,7 +136,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { try { conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +"(v1)"); - fail("Expected to fail due to non-timestamp column in the index PK"); + fail("Expected to fail due to non-date column in the index PK"); } catch (SQLException e) { assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), e.getErrorCode()); @@ -152,13 +162,13 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { "(v2) INCLUDE (pre, post) INDEX_TYPE=g"); assertCDCState(conn, cdcName, "PRE,POST", 3); assertPTable(cdcName, new HashSet<>( - Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST))); + Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName); cdcName = generateUniqueName(); conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + "(v2) INDEX_TYPE=l"); assertCDCState(conn, cdcName, null, 2); - assertPTable(cdcName, null); + assertPTable(cdcName, null, tableName); String viewName = generateUniqueName(); conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + @@ -209,7 +219,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { assertEquals("K", cdcPkColumns.get(2).getName().getString()); } - @Test public void testDropCDC () throws SQLException { Properties props = new Properties(); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -219,11 +228,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { + " v2 DATE)"); String cdcName = generateUniqueName(); - String cdc_sql = "CREATE CDC " + cdcName - + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; - conn.createStatement().execute(cdc_sql); - assertCDCState(conn, cdcName, null, 3); - String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(drop_cdc_sql); @@ -269,4 +273,126 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { } } + private void assertResultSet(ResultSet rs) throws Exception{ + Gson gson = new Gson(); + assertEquals(true, rs.next()); + assertEquals(1, rs.getInt(2)); + assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3), + HashMap.class)); + assertEquals(true, rs.next()); + assertEquals(2, rs.getInt(2)); + assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3), + HashMap.class)); + assertEquals(true, rs.next()); + assertEquals(1, rs.getInt(2)); + assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3), + HashMap.class)); + assertEquals(false, rs.next()); + rs.close(); + } + + @Test + public void testSelectCDC() throws Exception { + Properties props = new Properties(); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.client.scanner.timeout.period", "6000000"); + props.put("phoenix.query.timeoutMs", "6000000"); + props.put("zookeeper.session.timeout", "6000000"); + props.put("hbase.rpc.timeout", "6000000"); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)"); + conn.commit(); + Thread.sleep(1000); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); + conn.commit(); + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; + conn.createStatement().execute(cdc_sql); + assertCDCState(conn, cdcName, null, 3); + // NOTE: To debug the query execution, add the below condition where you need a breakpoint. + // if (<table>.getTableName().getString().equals("N000002") || + // <table>.getTableName().getString().equals("__CDC__N000002")) { + // "".isEmpty(); + // } + assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName)); + assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()")); + assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " + + "FROM " + cdcName)); + assertResultSet(conn.createStatement().executeQuery("SELECT " + + "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName)); + + HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{ + put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1}); + put("SELECT * FROM " + cdcName + + " ORDER BY k ASC", new int [] {1, 1, 2}); + put("SELECT * FROM " + cdcName + + " ORDER BY k DESC", new int [] {2, 1, 1}); + put("SELECT * FROM " + cdcName + + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1}); + }}; + for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) { + try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) { + for (int k: testQuery.getValue()) { + assertEquals(true, rs.next()); + assertEquals(k, rs.getInt(2)); + } + assertEquals(false, rs.next()); + } + } + + try (ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) { + assertEquals(false, rs.next()); + } + try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) { + assertEquals(true, rs.next()); + assertEquals("abc", rs.getString(1)); + assertEquals(true, rs.next()); + assertEquals("abc", rs.getString(1)); + assertEquals(false, rs.next()); + } + } + + // Temporary test case used as a reference for debugging and comparing against the CDC query. + @Test + public void testSelectUncoveredIndex() throws Exception { + Properties props = new Properties(); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.client.scanner.timeout.period", "6000000"); + props.put("phoenix.query.timeoutMs", "6000000"); + props.put("zookeeper.session.timeout", "6000000"); + props.put("hbase.rpc.timeout", "6000000"); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" + + " (1, 100)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" + + " (2, 200)"); + conn.commit(); + String indexName = generateUniqueName(); + String index_sql = "CREATE UNCOVERED INDEX " + indexName + + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; + conn.createStatement().execute(index_sql); + //ResultSet rs = + // conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName + + // " " + indexName + ") */ * FROM " + tableName); + ResultSet rs = + conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName + + " " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName); + assertEquals(true, rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(100, rs.getInt(2)); + assertEquals(true, rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals(200, rs.getInt(2)); + assertEquals(false, rs.next()); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java index e29284a951..aae8e48108 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java @@ -175,7 +175,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest { String timeZoneID = Calendar.getInstance().getTimeZone().getID(); // Write a query to get the val2 = 'bc' with a time range query String query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ") - + "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + + "val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName + " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after @@ -186,8 +186,10 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest { assertTrue(rs.next()); assertEquals("bc", rs.getString(1)); assertEquals("bcd", rs.getString(2)); + assertEquals("bcd", rs.getString(2)); assertTrue(rs.getTimestamp(3).after(before)); assertTrue(rs.getTimestamp(3).before(after)); + assertEquals("bcde", rs.getString(4)); assertFalse(rs.next()); // Count the number of index rows rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName); @@ -206,10 +208,11 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest { assertEquals("bcd", rs.getString(2)); assertTrue(rs.getTimestamp(3).after(before)); assertTrue(rs.getTimestamp(3).before(after)); + assertEquals("bcde", rs.getString(4)); assertFalse(rs.next()); // Write a time range query to get the last row with val2 ='bc' query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ") - +"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + + +"val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName + " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; // Verify that we will read from the index table @@ -219,6 +222,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest { assertEquals("bc", rs.getString(1)); assertEquals("ccc", rs.getString(2)); assertTrue(rs.getTimestamp(3).after(after)); + assertEquals("cccc", rs.getString(4)); assertFalse(rs.next()); // Verify that we can execute the same query without using the index String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 9e6b90cded..8dc6678f24 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -237,7 +237,7 @@ public class ProjectionCompiler { ColumnRef ref = null; try { indexColumn = index.getColumnForColumnName(indexColName); - //TODO could should we do this more efficiently than catching the expcetion ? + // TODO: Should we do this more efficiently than catching the exception ? } catch (ColumnNotFoundException e) { if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) { //Projected columns have the same name as in the data table diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 41459dc133..9981e31219 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.phoenix.parse.HintNode; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.thirdparty.com.google.common.base.Optional; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; @@ -82,6 +85,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ParseNodeUtil; import org.apache.phoenix.util.ParseNodeUtil.RewriteResult; @@ -698,6 +702,14 @@ public class QueryCompiler { if (projectedTable != null) { context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes())); } + + if (context.getCurrentTable().getTable().getType() == PTableType.CDC) { + // This will get the data column added to the context so that projection can get + // serialized.. + context.getDataColumnPosition( + context.getCurrentTable().getTable().getColumnForColumnName( + QueryConstants.CDC_JSON_COL_NAME)); + } } ColumnResolver resolver = context.getResolver(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index f795254a36..c12cd62d56 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TimeZone; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -46,6 +45,7 @@ import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.schema.types.PTime; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.NumberUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -84,6 +84,7 @@ public class StatementContext { private QueryLogger queryLogger; private boolean isClientSideUpsertSelect; private boolean isUncoveredIndex; + private String cdcIncludeScopes; public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); @@ -378,4 +379,11 @@ public class StatementContext { return retrying; } } + public String getEncodedCdcIncludeScopes() { + return cdcIncludeScopes; + } + + public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) { + this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes); + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 99392f6d29..faa940a5e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -16,12 +16,15 @@ * limitations under the License. */ package org.apache.phoenix.compile; +import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; +import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; @@ -61,18 +64,21 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; public class TupleProjectionCompiler { public static final PName PROJECTED_TABLE_SCHEMA = PNameFactory.newName("."); + public static final EnumSet<PTableType> PROJECTED_TABLE_TYPES = EnumSet.of(PTableType.TABLE, + PTableType.INDEX, PTableType.VIEW, PTableType.CDC); private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory(); public static PTable createProjectedTable(SelectStatement select, StatementContext context) throws SQLException { Preconditions.checkArgument(!select.isJoin()); // Non-group-by or group-by aggregations will create its own projected result. - if (select.getInnerSelectStatement() != null + if (select.getInnerSelectStatement() != null || select.getFrom() == null || select.isAggregate() || select.isDistinct() - || (context.getResolver().getTables().get(0).getTable().getType() != PTableType.TABLE - && context.getResolver().getTables().get(0).getTable().getType() != PTableType.INDEX && context.getResolver().getTables().get(0).getTable().getType() != PTableType.VIEW)) + || ! PROJECTED_TABLE_TYPES.contains( + context.getResolver().getTables().get(0).getTable().getType())) { return null; + } List<PColumn> projectedColumns = new ArrayList<PColumn>(); boolean isWildcard = false; @@ -86,7 +92,7 @@ public class TupleProjectionCompiler { if (node instanceof WildcardParseNode) { if (((WildcardParseNode) node).isRewrite()) { TableRef parentTableRef = FromCompiler.getResolver( - NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(), + NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(), table.getParentTableName().getString())), context.getConnection()).resolveTable( table.getSchemaName().getString(), table.getParentTableName().getString()); @@ -162,8 +168,8 @@ public class TupleProjectionCompiler { // add IndexUncoveredDataColumnRef position = projectedColumns.size() + (hasSaltingColumn ? 1 : 0); for (IndexUncoveredDataColumnRef sourceColumnRef : visitor.indexColumnRefSet) { - PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), - sourceColumnRef.getColumn().getFamilyName(), position++, + PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), + sourceColumnRef.getColumn().getFamilyName(), position++, sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes()); projectedColumns.add(column); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 7493acceac..d76046d3b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -150,7 +150,12 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName"; public static final String INDEX_ROW_KEY = "_IndexRowKey"; public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable"; - + public static final String CDC_DATA_TABLE_NAME = "_CdcDataTableName"; + public static final String CDC_JSON_COL_QUALIFIER = "_CdcJsonColumn_Qualifier"; + public static final String CDC_INCLUDE_SCOPES = "_CdcIncludeScopes"; + public static final String DATA_COL_QUALIFIER_TO_NAME_MAP = "_DataColQualToNameMap"; + public static final String DATA_COL_QUALIFIER_TO_TYPE_MAP = "_DataColQualToTypeMap"; + public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1); public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2); // In case of Index Write failure, we need to determine that Index mutation diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java new file mode 100644 index 0000000000..178bb1d705 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import com.google.gson.Gson; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP; + +public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner { + private static final Logger LOGGER = + LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class); + + private Map<ImmutableBytesPtr, String> dataColQualNameMap; + private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap; + // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>> + private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> dataRowChanges = + new HashMap<>(); + + public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, + final Region region, + final Scan scan, + final RegionCoprocessorEnvironment env, + final Scan dataTableScan, + final TupleProjector tupleProjector, + final IndexMaintainer indexMaintainer, + final byte[][] viewConstants, + final ImmutableBytesWritable ptr, + final long pageSizeMs, + final long queryLimit) throws IOException { + super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, + viewConstants, ptr, pageSizeMs, queryLimit); + CDCUtil.initForRawScan(dataTableScan); + dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( + scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); + dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( + scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + } + + @Override + protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); + } + + protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { + if (indexRowIterator.hasNext()) { + List<Cell> indexRow = indexRowIterator.next(); + for (Cell c: indexRow) { + if (c.getType() == Cell.Type.Put) { + result.add(c); + } + } + try { + Result dataRow = null; + if (! result.isEmpty()) { + Cell firstCell = result.get(0); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + dataRow = dataRows.get(dataRowKey); + Long indexRowTs = result.get(0).getTimestamp(); + Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( + dataRowKey); + if (changeTimeline == null) { + List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); + Collections.sort(resultCells, CellComparator.getInstance().reversed()); + List<Cell> deleteMarkers = new ArrayList<>(); + List<List<Cell>> columns = new LinkedList<>(); + Cell currentColumnCell = null; + Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( + EncodedColumnsUtil.getQualifierEncodingScheme(scan)); + List<Cell> currentColumn = null; + Set<Long> uniqueTimeStamps = new HashSet<>(); + // TODO: From CompactionScanner.formColumns(), see if this can be refactored. + for (Cell cell : resultCells) { + uniqueTimeStamps.add(cell.getTimestamp()); + if (cell.getType() != Cell.Type.Put) { + deleteMarkers.add(cell); + } + if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + emptyKV.getFirst())) { + continue; + } + if (currentColumnCell == null) { + currentColumn = new LinkedList<>(); + currentColumnCell = cell; + currentColumn.add(cell); + } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { + columns.add(currentColumn); + currentColumn = new LinkedList<>(); + currentColumnCell = cell; + currentColumn.add(cell); + } else { + currentColumn.add(cell); + } + } + if (currentColumn != null) { + columns.add(currentColumn); + } + List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( + Collectors.toList()); + // FIXME: Does this need to be Concurrent? + Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); + int[] columnPointers = new int[columns.size()]; + changeTimeline = new TreeMap<>(); + dataRowChanges.put(dataRowKey, changeTimeline); + for (Long ts : sortedTimestamps) { + for (int i = 0; i < columns.size(); ++i) { + Cell cell = columns.get(i).get(columnPointers[i]); + if (cell.getTimestamp() == ts) { + rollingRow.put(new ImmutableBytesPtr( + cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength()), + cell); + ++columnPointers[i]; + } + } + Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap(); + rowOfCells.putAll(rollingRow); + changeTimeline.put(ts, rowOfCells); + } + } + + Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs); + if (mapOfCells != null) { + Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size()); + for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) { + String colName = dataColQualNameMap.get(entry.getKey()); + Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject( + entry.getValue().getValueArray()); + rowValueMap.put(colName, colVal); + } + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(), + firstCell.getFamilyOffset(), firstCell.getFamilyLength()); + dataRow = Result.create(Arrays.asList(builder. + setRow(dataRowKey.copyBytesIfNecessary()). + setFamily(family.copyBytesIfNecessary()). + setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))). + setTimestamp(firstCell.getTimestamp()). + setValue(value). + setType(Cell.Type.Put). + build())); + } + } + if (dataRow != null && tupleProjector != null) { + IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow), + tupleProjector, ptr); + } + else { + result.clear(); + } + return true; + } catch (Throwable e) { + LOGGER.error("Exception in UncoveredIndexRegionScanner for region " + + region.getRegionInfo().getRegionNameAsString(), e); + throw e; + } + } + return false; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index f4a0dcb7d0..efdf19a364 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -86,6 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES; import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; +import static org.apache.phoenix.schema.PTableType.CDC; import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; @@ -1465,8 +1466,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = Lists.newArrayList(); List<PName> physicalTables = Lists.newArrayList(); - PName parentTableName = tableType == INDEX ? dataTableName : null; - PName parentSchemaName = tableType == INDEX ? schemaName : null; + PName parentTableName = tableType == INDEX || tableType == CDC ? dataTableName : null; + PName parentSchemaName = tableType == INDEX || tableType == CDC ? schemaName : null; PName parentLogicalName = null; EncodedCQCounter cqCounter = null; if (oldTable != null) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index 438c014ca3..d010c33dff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -51,14 +51,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS; import static org.apache.phoenix.util.ScanUtil.getDummyResult; import static org.apache.phoenix.util.ScanUtil.isDummy; @@ -66,6 +72,7 @@ import static org.apache.phoenix.util.ScanUtil.isDummy; public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(UncoveredIndexRegionScanner.class); + /** * The states of the processing a page of index rows */ @@ -170,6 +177,11 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { protected abstract void scanDataTableRows(long startTime) throws IOException; protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + return prepareDataTableScan(dataRowKeys, false); + } + + protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys, + boolean includeMultipleVersions) throws IOException { List<KeyRange> keys = new ArrayList<>(dataRowKeys.size()); for (byte[] dataRowKey : dataRowKeys) { // If the data table scan was interrupted because of paging we retry the scan @@ -185,7 +197,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); scanRanges.initializeScan(dataScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - dataScan.setFilter(new SkipScanFilter(skipScanFilter, false)); + dataScan.setFilter(new SkipScanFilter(skipScanFilter, includeMultipleVersions)); dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); return dataScan; @@ -244,9 +256,15 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { } Cell firstCell = row.get(0); byte[] indexRowKey = firstCell.getRowArray(); - ptr.set(indexRowKey, firstCell.getRowOffset() + offset, - firstCell.getRowLength() - offset); - lastIndexRowKey = ptr.copyBytes(); + // Avoid unnecessary byte copy and garbage when the row key is what we need. + if (firstCell.getRowOffset() + offset == 0 && firstCell.getRowLength() - offset == indexRowKey.length) { + lastIndexRowKey = indexRowKey; + } + else { + ptr.set(indexRowKey, firstCell.getRowOffset() + offset, + firstCell.getRowLength() - offset); + lastIndexRowKey = ptr.copyBytes(); + } indexToDataRowKeyMap.put(offset == 0 ? lastIndexRowKey : CellUtil.cloneRow(firstCell), indexMaintainer.buildDataRowKey( new ImmutableBytesWritable(lastIndexRowKey), @@ -300,7 +318,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { return false; } - private boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { + protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); result.addAll(indexRow); @@ -308,6 +326,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { byte[] indexRowKey = CellUtil.cloneRow(indexRow.get(0)); Result dataRow = dataRows.get(new ImmutableBytesPtr( indexToDataRowKeyMap.get(indexRowKey))); + if (dataRow != null) { long ts = indexRow.get(0).getTimestamp(); if (!indexMaintainer.isUncovered() diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index ded8f17c67..ca0259203a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.sql.ParameterMetaData; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -79,6 +80,7 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.thirdparty.com.google.common.base.Optional; import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; import org.apache.phoenix.trace.TracingIterator; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; @@ -314,30 +316,38 @@ public abstract class BaseQueryPlan implements QueryPlan { ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes()); // Set index related scan attributes. - if (table.getType() == PTableType.INDEX) { + if (table.getType() == PTableType.INDEX || table.getType() == PTableType.CDC) { if (table.getIndexType() == IndexType.LOCAL) { ScanUtil.setLocalIndex(scan); } else if (context.isUncoveredIndex()) { ScanUtil.setUncoveredGlobalIndex(scan); } + PTable dataTable = null; Set<PColumn> dataColumns = context.getDataColumns(); // If any data columns to join back from data table are present then we set following attributes // 1. data columns to be projected and their key value schema. - // 2. index maintainer and view constants if exists to build data row key from index row key. + // 2. index maintainer and view constants if exists to build data row key from index row key. // TODO: can have an hint to skip joining back to data table, in that case if any column to // project is not present in the index then we need to skip this plan. if (!dataColumns.isEmpty()) { // Set data columns to be join back from data table. PTable parentTable = context.getCurrentTable().getTable(); String parentSchemaName = parentTable.getParentSchemaName().getString(); - String parentTableName = parentTable.getParentTableName().getString(); - final ParseNodeFactory FACTORY = new ParseNodeFactory(); - TableRef dataTableRef = - FromCompiler.getResolver( - FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)), - context.getConnection()).resolveTable(parentSchemaName, parentTableName); - PTable dataTable = dataTableRef.getTable(); + if (parentTable.getType() == PTableType.CDC) { + dataTable = parentTable; + } + else { + String parentTableName = parentTable.getParentTableName().getString(); + final ParseNodeFactory FACTORY = new ParseNodeFactory(); + TableRef dataTableRef = + FromCompiler.getResolver( + FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)), + context.getConnection()).resolveTable(parentSchemaName, parentTableName); + dataTable = dataTableRef.getTable(); + } + } + if (! dataColumns.isEmpty()) { // Set data columns to be join back from data table. serializeDataTableColumnsToJoin(scan, dataColumns, dataTable); KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns); @@ -579,4 +589,4 @@ public abstract class BaseQueryPlan implements QueryPlan { ResultIterator iterator = iterator(); iterator.close(); } -} \ No newline at end of file +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 7877e71ca3..171108f87f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -18,10 +18,12 @@ package org.apache.phoenix.iterate; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME; import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import org.apache.hadoop.hbase.CellUtil; +import org.apache.phoenix.coprocessor.CDCGlobalIndexRegionScanner; import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner; import org.apache.phoenix.coprocessor.UncoveredLocalIndexRegionScanner; import org.apache.phoenix.schema.KeyValueSchema; @@ -133,8 +135,7 @@ public abstract class RegionScannerFactory { long extraLimit = -1; { - // for indexes construct the row filter for uncovered columns if it exists - if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) { + if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) { byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER); if (expBytes == null) { // For older clients @@ -166,22 +167,24 @@ public abstract class RegionScannerFactory { PTable.ImmutableStorageScheme storageScheme = indexMaintainer.getIndexStorageScheme(); Scan dataTableScan = new Scan(); - if (dataColumns != null) { - for (int i = 0; i < dataColumns.length; i++) { - if (storageScheme == - PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { - dataTableScan.addFamily(dataColumns[i].getFamily()); - } else { - dataTableScan.addColumn(dataColumns[i].getFamily(), - dataColumns[i].getQualifier()); + if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) { + if (dataColumns != null) { + for (int i = 0; i < dataColumns.length; i++) { + if (storageScheme == + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + dataTableScan.addFamily(dataColumns[i].getFamily()); + } else { + dataTableScan.addColumn(dataColumns[i].getFamily(), + dataColumns[i].getQualifier()); + } + } + } else if (indexMaintainer.isUncovered()) { + // Indexed columns and the columns in index where clause should also be added + // to the data columns to scan for uncovered global indexes. This is required + // to verify the index row against the data table row. + for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) { + dataTableScan.addColumn(column.getFamily(), column.getQualifier()); } - } - } else if (indexMaintainer.isUncovered()) { - // Indexed columns and the columns in index where clause should also be added - // to the data columns to scan for uncovered global indexes. This is required - // to verify the index row against the data table row. - for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) { - dataTableScan.addColumn(column.getFamily(), column.getQualifier()); } } if (ScanUtil.isLocalIndex(scan)) { @@ -189,9 +192,16 @@ public abstract class RegionScannerFactory { dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, offset, actualStartKey, extraLimit); } else { - s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, - dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, - pageSizeMs, extraLimit); + if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) { + s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, + dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, + pageSizeMs, extraLimit); + } + else { + s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, + dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, + pageSizeMs, extraLimit); + } } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 9087411c4f..a38273d97e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -152,7 +152,7 @@ public class TableResultIterator implements ResultIterator { .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); this.isMapReduceContext = isMapReduceContext; this.maxQueryEndTime = maxQueryEndTime; - ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection()); + ScanUtil.setScanAttributesForClient(scan, table, plan.getContext()); } // Constructors without maxQueryEndTime to maintain API compatibility for phoenix-connectors diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 504ae47e91..5d81f0b585 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -19,6 +19,7 @@ package org.apache.phoenix.optimize; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -72,6 +73,7 @@ import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ParseNodeUtil; import org.apache.phoenix.util.ParseNodeUtil.RewriteResult; @@ -216,6 +218,20 @@ public class QueryOptimizer { && stopAtBestPlan && dataPlan.isApplicable()) { return Collections.<QueryPlan> singletonList(dataPlan); } + + PTable table = dataPlan.getTableRef().getTable(); + // TODO: Need to handle CDC hints. + if (table.getType() == PTableType.CDC) { + Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes(); + String cdcHint = select.getHint().getHint(Hint.INCLUDE); + if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) { + cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1, + cdcHint.length() - 1)); + } + dataPlan.getContext().setCDCIncludeScopes(cdcIncludeScopes); + return Arrays.asList(dataPlan); + } + List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes()); if (dataPlan.isApplicable() && (indexes.isEmpty() || dataPlan.isDegenerate() @@ -236,8 +252,8 @@ public class QueryOptimizer { targetColumns = targetDatums; } - SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef())); List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size()); + SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef())); plans.add(dataPlan); QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans); if (hintedPlan != null) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index baba60bc91..a80b092956 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -17,9 +17,14 @@ */ package org.apache.phoenix.parse; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; @@ -36,9 +41,11 @@ public class HintNode { public static final char SEPARATOR = ' '; public static final String PREFIX = "("; public static final String SUFFIX = ")"; - // Split on whitespace and parenthesis, keeping the parenthesis in the token array - private static final String SPLIT_REGEXP = "\\s+|((?<=\\" + PREFIX + ")|(?=\\" + PREFIX + "))|((?<=\\" + SUFFIX + ")|(?=\\" + SUFFIX + "))"; - + // Each hint is of the generic syntax hintWord(hintArgs) where hintArgs in parent are optional. + private static final Pattern HINT_PATTERN = Pattern.compile( + "(?<hintWord>\\w+)\\s*(?:\\s*\\(\\s*(?<hintArgs>[^)]+)\\s*\\))?"); + private static final Pattern HINT_ARG_PATTERN = Pattern.compile("(?<hintArg>\"[^\"]+\"|\\S+)"); + public enum Hint { /** * Forces a range scan to be used to process the query. @@ -53,62 +60,62 @@ public class HintNode { */ NO_CHILD_PARENT_JOIN_OPTIMIZATION, /** - * Prevents the usage of indexes, forcing usage - * of the data table for a query. - */ - NO_INDEX, - /** - * Hint of the form {@code INDEX(<table_name> <index_name>...) } - * to suggest usage of the index if possible. The first - * usable index in the list of indexes will be choosen. - * Table and index names may be surrounded by double quotes - * if they are case sensitive. - */ - INDEX, - /** - * All things being equal, use the data table instead of - * the index table when optimizing. - */ - USE_DATA_OVER_INDEX_TABLE, - /** - * All things being equal, use the index table instead of - * the data table when optimizing. - */ - USE_INDEX_OVER_DATA_TABLE, - /** - * Avoid caching any HBase blocks loaded by this query. - */ - NO_CACHE, - /** - * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm. - */ - USE_SORT_MERGE_JOIN, - /** - * Persist the RHS results of a hash join. - */ - USE_PERSISTENT_CACHE, - /** - * Avoid using star-join optimization. Used for broadcast join (hash join) only. - */ - NO_STAR_JOIN, - /** - * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2 - * selected columns and/or versions of columns, this should be used. - */ - SEEK_TO_COLUMN, - /** - * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in - * between 2 selected columns this will be give better performance. - */ - NO_SEEK_TO_COLUMN, - /** - * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation. - */ - SMALL, - /** - * Enforces a serial scan. - */ - SERIAL, + * Prevents the usage of indexes, forcing usage + * of the data table for a query. + */ + NO_INDEX, + /** + * Hint of the form {@code INDEX(<table_name> <index_name>...) } + * to suggest usage of the index if possible. The first + * usable index in the list of indexes will be choosen. + * Table and index names may be surrounded by double quotes + * if they are case sensitive. + */ + INDEX, + /** + * All things being equal, use the data table instead of + * the index table when optimizing. + */ + USE_DATA_OVER_INDEX_TABLE, + /** + * All things being equal, use the index table instead of + * the data table when optimizing. + */ + USE_INDEX_OVER_DATA_TABLE, + /** + * Avoid caching any HBase blocks loaded by this query. + */ + NO_CACHE, + /** + * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm. + */ + USE_SORT_MERGE_JOIN, + /** + * Persist the RHS results of a hash join. + */ + USE_PERSISTENT_CACHE, + /** + * Avoid using star-join optimization. Used for broadcast join (hash join) only. + */ + NO_STAR_JOIN, + /** + * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2 + * selected columns and/or versions of columns, this should be used. + */ + SEEK_TO_COLUMN, + /** + * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in + * between 2 selected columns this will be give better performance. + */ + NO_SEEK_TO_COLUMN, + /** + * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation. + */ + SMALL, + /** + * Enforces a serial scan. + */ + SERIAL, /** * Enforces a forward scan. */ @@ -121,7 +128,13 @@ public class HintNode { /** * Do not use server merge for hinted uncovered indexes */ - NO_INDEX_SERVER_MERGE + NO_INDEX_SERVER_MERGE, + + /** + * Override the default CDC include scopes. + */ + INCLUDE, + ; }; private final Map<Hint,String> hints; @@ -160,41 +173,33 @@ public class HintNode { public HintNode(String hint) { Map<Hint,String> hints = new HashMap<Hint,String>(); - // Split on whitespace or parenthesis. We do not need to handle escaped or - // embedded whitespace/parenthesis, since we are parsing what will be HBase - // table names which are not allowed to contain whitespace or parenthesis. - String[] hintWords = hint.split(SPLIT_REGEXP); - for (int i = 0; i < hintWords.length; i++) { - String hintWord = hintWords[i]; - if (hintWord.isEmpty()) { - continue; - } + Matcher hintMatcher = HINT_PATTERN.matcher(hint); + while (hintMatcher.find()) { try { - Hint key = Hint.valueOf(hintWord.toUpperCase()); - String hintValue = ""; - if (i+1 < hintWords.length && PREFIX.equals(hintWords[i+1])) { - StringBuffer hintValueBuf = new StringBuffer(hint.length()); - hintValueBuf.append(PREFIX); - i+=2; - while (i < hintWords.length && !SUFFIX.equals(hintWords[i])) { - hintValueBuf.append(SchemaUtil.normalizeIdentifier(hintWords[i++])); - hintValueBuf.append(SEPARATOR); + Hint hintWord = Hint.valueOf(hintMatcher.group("hintWord").toUpperCase()); + String hintArgsStr = hintMatcher.group("hintArgs"); + List<String> hintArgs = new ArrayList<>(); + if (hintArgsStr != null) { + Matcher hintArgMatcher = HINT_ARG_PATTERN.matcher(hintArgsStr); + while (hintArgMatcher.find()) { + hintArgs.add(SchemaUtil.normalizeIdentifier(hintArgMatcher.group())); } - // Replace trailing separator with suffix - hintValueBuf.replace(hintValueBuf.length()-1, hintValueBuf.length(), SUFFIX); - hintValue = hintValueBuf.toString(); } - String oldValue = hints.put(key, hintValue); - // Concatenate together any old value with the new value - if (oldValue != null) { - hints.put(key, oldValue + hintValue); + hintArgsStr = String.join(" ", hintArgs); + hintArgsStr = hintArgsStr.equals("") ? "" : "(" + hintArgsStr + ")"; + if (hints.containsKey(hintWord)) { + // Concatenate together any old value with the new value + hints.put(hintWord, hints.get(hintWord) + hintArgsStr); + } + else { + hints.put(hintWord, hintArgsStr); } } catch (IllegalArgumentException e) { // Ignore unknown/invalid hints } } this.hints = ImmutableMap.copyOf(hints); } - + public boolean isEmpty() { return hints.isEmpty(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java index c7fdbf6a20..2618e0fa6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java @@ -122,7 +122,8 @@ public class ColumnRef { displayName); } - if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY) { + if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY || + table.getType() == PTableType.CDC) { return new ProjectedColumnExpression(column, table, displayName); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 8755b0ed96..76108a6220 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1770,14 +1770,17 @@ public class MetaDataClient { new HashMap<>(), null); // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to // protect based on CDCUtil.isACDCIndex(). - // TODO: Should we also allow PTimestamp here? MutationState indexMutationState; try { + // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type, + // but we are forced to support PDate because of incorrect type for + // PHOENIX_ROW_TIMESTAMP (see PHOENIX-6807)? indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE); } catch (SQLException e) { if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) { throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName( - statement.getCdcObjName().getName()).build().buildException(); + statement.getCdcObjName().getName()).setRootCause( + e).build().buildException(); } // TODO: What about translating other index creation failures? E.g., bad TS column. throw e; @@ -1791,8 +1794,8 @@ public class MetaDataClient { ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ? statement.getTimeIdxColumn() : FACTORY.columnName(statement.getTimeIdxFunc().toString()); - columnDefs.add(FACTORY.columnDef(timeIdxCol, PTimestamp.INSTANCE.getSqlTypeName(), false, null, false, - PTimestamp.INSTANCE.getMaxLength(null), PTimestamp.INSTANCE.getScale(null), false, + columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false, + PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false, SortOrder.getDefault(), "", null, false)); pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false)); for (PColumn pcol : pkColumns) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 533f6dc2d9..0b3beb885b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -1795,7 +1795,9 @@ public class PTableImpl implements PTable { return SchemaUtil.getPhysicalHBaseTableName(schemaName, physicalTableNameColumnInSyscat, isNamespaceMapped); } - return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped); + return SchemaUtil.getPhysicalHBaseTableName(schemaName, getType() == PTableType.CDC ? + PNameFactory.newName(CDCUtil.getCDCIndexName(tableName.getString())) : + tableName, isNamespaceMapped); } else { return PNameFactory.newName(physicalNames.get(0).getBytes()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java index d015aaf422..e80fd4ee09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -18,14 +18,25 @@ package org.apache.phoenix.util; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.StringTokenizer; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.StringUtils; import org.apache.phoenix.schema.PTable; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES; + public class CDCUtil { public static final String CDC_INDEX_PREFIX = "__CDC__"; public static final String CDC_INDEX_TYPE_LOCAL = "L"; @@ -82,4 +93,23 @@ public class CDCUtil { public static boolean isACDCIndex(String indexName) { return indexName.startsWith(CDC_INDEX_PREFIX); } + + public static boolean isACDCIndex(PTable indexTable) { + return isACDCIndex(indexTable.getTableName().getString()); + } + + public static Scan initForRawScan(Scan scan) { + scan.setRaw(true); + scan.readAllVersions(); + scan.setCacheBlocks(false); + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + if (! familyMap.isEmpty()) { + familyMap.keySet().stream().forEach(fQual -> { + if (familyMap.get(fQual) != null) { + familyMap.get(fQual).clear(); + } + }); + } + return scan; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java index f1c0b1d6e0..e02abf6720 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java @@ -84,9 +84,7 @@ public class EncodedColumnsUtil { * part is the value to use for it. */ public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) { - return usesEncodedColumnNames(table) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES, - QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES, - QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + return getEmptyKeyValueInfo(usesEncodedColumnNames(table)); } /** @@ -104,9 +102,7 @@ public class EncodedColumnsUtil { * part is the value to use for it. */ public static Pair<byte[], byte[]> getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) { - return usesEncodedColumnNames(encodingScheme) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES, - QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES, - QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + return getEmptyKeyValueInfo(usesEncodedColumnNames(encodingScheme)); } public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan scan) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index fdafd1f33b..f2b6b911fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -19,20 +19,32 @@ package org.apache.phoenix.util; import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY; import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; +import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +68,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; @@ -788,7 +801,7 @@ public class ScanUtil { * that the slot at index 2 has a slot index of 2 but a row key index of 3. * To calculate the "adjusted position" index, we simply add up the number of extra slots spanned and offset * the slotPosition by that much. - * @param slotSpan the extra span per skip scan slot. corresponds to {@link ScanRanges#slotSpan} + * @param slotSpan the extra span per skip scan slot. corresponds to {@link ScanRanges#getSlotSpans()} * @param slotPosition the index of a slot in the SkipScan slots list. * @return the equivalent row key position in the RowKeySchema */ @@ -1167,7 +1180,8 @@ public class ScanUtil { scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES); } else { - if (table.getType() != PTableType.INDEX || !IndexUtil.isGlobalIndex(indexTable)) { + if (table.getType() != PTableType.CDC && (table.getType() != PTableType.INDEX || + !IndexUtil.isGlobalIndex(indexTable))) { return; } if (table.isTransactional() && table.getIndexType() == IndexType.UNCOVERED_GLOBAL) { @@ -1180,7 +1194,13 @@ public class ScanUtil { } // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work - if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { + if (table.getType() == PTableType.CDC) { + indexTable = PhoenixRuntime.getTable(phoenixConnection, + CDCUtil.getCDCIndexName(table.getName().getString())); + } + else if (indexTable.getViewIndexId() != null && + indexTable.getName().getString().contains( + QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); String indexName = indexTable.getName().getString().substring(lastIndexOf + 1); indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName); @@ -1280,7 +1300,8 @@ public class ScanUtil { } public static void setScanAttributesForClient(Scan scan, PTable table, - PhoenixConnection phoenixConnection) throws SQLException { + StatementContext context) throws SQLException { + PhoenixConnection phoenixConnection = context.getConnection(); setScanAttributesForIndexReadRepair(scan, table, phoenixConnection); setScanAttributesForPhoenixTTL(scan, table, phoenixConnection); byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME); @@ -1298,6 +1319,114 @@ public class ScanUtil { } setScanAttributeForPaging(scan, phoenixConnection); + + if (table.getType() == PTableType.CDC) { + PTable dataTable = PhoenixRuntime.getTable(phoenixConnection, + SchemaUtil.getTableName(table.getSchemaName().getString(), + table.getParentTableName().getString())); + scan.setAttribute(CDC_DATA_TABLE_NAME, + table.getParentName().getBytes()); + + PColumn cdcJsonCol = table.getColumnForColumnName(CDC_JSON_COL_NAME); + scan.setAttribute(CDC_JSON_COL_QUALIFIER, cdcJsonCol.getColumnQualifierBytes()); + scan.setAttribute(CDC_INCLUDE_SCOPES, + context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8)); + CDCUtil.initForRawScan(scan); + List<PColumn> columns = dataTable.getColumns(); + Map<byte[], String> dataColQualNameMap = new HashMap<>(columns.size()); + Map<byte[], PDataType> dataColTypeMap = new HashMap<>(); + for (PColumn col: columns) { + if (col.getColumnQualifierBytes() != null) { + dataColQualNameMap.put(col.getColumnQualifierBytes(), col.getName().getString()); + dataColTypeMap.put(col.getColumnQualifierBytes(), col.getDataType()); + } + } + scan.setAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP, + serializeColumnQualifierToNameMap(dataColQualNameMap)); + scan.setAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP, + serializeColumnQualifierToTypeMap(dataColTypeMap)); + } + } + + public static byte[] serializeColumnQualifierToNameMap(Map<byte[], String> colQualNameMap) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); + try { + output.writeInt(colQualNameMap.size()); + for (Map.Entry<byte[], String> entry: colQualNameMap.entrySet()) { + output.writeInt(entry.getKey().length); + output.write(entry.getKey()); + WritableUtils.writeString(output, entry.getValue()); + } + return stream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static Map<ImmutableBytesPtr, String> deserializeColumnQualifierToNameMap( + byte[] mapBytes) { + ByteArrayInputStream stream = new ByteArrayInputStream(mapBytes); + DataInputStream input = new DataInputStream(stream); + try { + Map<ImmutableBytesPtr, String> colQualNameMap = new HashMap<>(); + int size = input.readInt(); + for (int i = 0; i < size; ++i) { + int qualLength = input.readInt(); + byte[] qualBytes = new byte[qualLength]; + int bytesRead = input.read(qualBytes); + if (bytesRead != qualLength) { + throw new IOException("Expected number of bytes: " + qualLength + " but got " + + "only: " + bytesRead); + } + String colName = WritableUtils.readString(input); + colQualNameMap.put(new ImmutableBytesPtr(qualBytes), colName); + } + return colQualNameMap; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static byte[] serializeColumnQualifierToTypeMap( + Map<byte[], PDataType> pkColNamesAndTypes) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); + try { + output.writeInt(pkColNamesAndTypes.size()); + for (Map.Entry<byte[], PDataType> entry: pkColNamesAndTypes.entrySet()) { + output.writeInt(entry.getKey().length); + output.write(entry.getKey()); + WritableUtils.writeString(output, entry.getValue().getSqlTypeName()); + } + return stream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static Map<ImmutableBytesPtr, PDataType> deserializeColumnQualifierToTypeMap( + byte[] pkColInfoBytes) { + ByteArrayInputStream stream = new ByteArrayInputStream(pkColInfoBytes); + DataInputStream input = new DataInputStream(stream); + try { + Map<ImmutableBytesPtr, PDataType> colQualTypeMap = new HashMap<>(); + int colCnt = input.readInt(); + for (int i = 0; i < colCnt; ++i) { + int qualLength = input.readInt(); + byte[] qualBytes = new byte[qualLength]; + int bytesRead = input.read(qualBytes); + if (bytesRead != qualLength) { + throw new IOException("Expected number of bytes: " + qualLength + " but got " + + "only: " + bytesRead); + } + colQualTypeMap.put(new ImmutableBytesPtr(qualBytes), + PDataType.fromSqlTypeName(WritableUtils.readString(input))); + } + return colQualTypeMap; + } catch (IOException e) { + throw new RuntimeException(e); + } } public static void setScanAttributeForPaging(Scan scan, PhoenixConnection phoenixConnection) { diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml index 981f04add0..dc3c5aec41 100644 --- a/phoenix-pherf/pom.xml +++ b/phoenix-pherf/pom.xml @@ -34,7 +34,6 @@ <!-- Versions for pherf-only dependencies --> <diffutils.version>1.2.1</diffutils.version> - <gson.version>2.9.1</gson.version> <commons-math3.version>3.3</commons-math3.version> <activation.version>1.1</activation.version> <jcabi-jdbc.version>0.15</jcabi-jdbc.version> @@ -151,7 +150,6 @@ <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>${gson.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> diff --git a/pom.xml b/pom.xml index 248273a4ba..0c629b1ac8 100644 --- a/pom.xml +++ b/pom.xml @@ -213,6 +213,7 @@ <protoc.arch>${os.detected.classifier}</protoc.arch> <!-- Keeping the version in sync with hbase javax.el version --> <glassfish.el.version>3.0.1-b08</glassfish.el.version> + <gson.version>2.9.1</gson.version> </properties> <build> @@ -1548,6 +1549,11 @@ <artifactId>javax.el</artifactId> <version>${glassfish.el.version}</version> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson.version}</version> + </dependency> </dependencies> </dependencyManagement>