This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch PHOENIX-7001-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push: new f07898f1fd PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802) f07898f1fd is described below commit f07898f1fd91f5777329b65175fd6ba5b643bbce Author: Hari Krishna Dara <harid...@gmail.com> AuthorDate: Wed Jan 24 21:59:00 2024 +0530 PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802) --- .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 231 +++++++++++++-------- phoenix-core/src/main/antlr3/PhoenixSQL.g | 10 +- .../apache/phoenix/exception/SQLExceptionCode.java | 4 +- .../apache/phoenix/exception/SQLExceptionInfo.java | 15 ++ .../org/apache/phoenix/index/IndexMaintainer.java | 2 +- .../phoenix/iterate/RegionScannerFactory.java | 60 +++--- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 11 +- .../apache/phoenix/optimize/QueryOptimizer.java | 3 +- .../apache/phoenix/parse/CreateCDCStatement.java | 15 +- .../java/org/apache/phoenix/parse/HintNode.java | 2 +- .../org/apache/phoenix/parse/ParseNodeFactory.java | 3 +- .../org/apache/phoenix/schema/MetaDataClient.java | 26 +-- .../java/org/apache/phoenix/schema/PTable.java | 6 - .../org/apache/phoenix/schema/TableProperty.java | 3 +- .../main/java/org/apache/phoenix/util/CDCUtil.java | 24 +-- .../org/apache/phoenix/parse/QueryParserTest.java | 43 ++-- .../java/org/apache/phoenix/util/CDCUtilTest.java | 21 +- 17 files changed, 245 insertions(+), 234 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java index d1f04c02cc..be35bcbf46 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java @@ -19,23 +19,27 @@ package org.apache.phoenix.end2end; import com.google.gson.Gson; import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableProperty; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -import javax.xml.transform.Result; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,8 +52,22 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@RunWith(Parameterized.class) @Category(ParallelStatsDisabledTest.class) public class CDCMiscIT extends ParallelStatsDisabledIT { + private final boolean forView; + + public CDCMiscIT(boolean forView) { + this.forView = forView; + } + + @Parameterized.Parameters(name = "forVieiw={0}") + public static synchronized Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false}, { true } + }); + } + private void assertCDCState(Connection conn, String cdcName, String expInclude, int idxType) throws SQLException { try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " + @@ -89,19 +107,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { assertEquals(nbuckets, indexTable.getBucketNum()); } + private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql) + throws Exception { + conn.createStatement().execute(cdc_sql); + IndexToolIT.runIndexTool(false, null, tableName, + "\""+CDCUtil.getCDCIndexName(cdcName)+"\""); + TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE); + } + @Test - public void testCreate() throws SQLException { + public void testCreate() throws Exception { Properties props = new Properties(); Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = generateUniqueName(); conn.createStatement().execute( "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER," + " v2 DATE)"); + if (forView) { + String viewName = generateUniqueName(); + conn.createStatement().execute( + "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); + tableName = viewName; + } String cdcName = generateUniqueName(); try { conn.createStatement().execute("CREATE CDC " + cdcName - + " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())"); + + " ON NON_EXISTENT_TABLE"); fail("Expected to fail due to non-existent table"); } catch (SQLException e) { assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode()); @@ -109,42 +141,16 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { try { conn.createStatement().execute("CREATE CDC " + cdcName - + " ON " + tableName +"(UNKNOWN_FUNCTION())"); - fail("Expected to fail due to invalid function"); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode()); - } - - try { - conn.createStatement().execute("CREATE CDC " + cdcName - + " ON " + tableName +"(NOW())"); - fail("Expected to fail due to non-deterministic function"); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX. - getErrorCode(), e.getErrorCode()); - } - - try { - conn.createStatement().execute("CREATE CDC " + cdcName - + " ON " + tableName +"(ROUND(v1))"); - fail("Expected to fail due to non-date expression in the index PK"); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), - e.getErrorCode()); - } - - try { - conn.createStatement().execute("CREATE CDC " + cdcName - + " ON " + tableName +"(v1)"); - fail("Expected to fail due to non-date column in the index PK"); + + " ON " + tableName + " INCLUDE (abc)"); + fail("Expected to fail due to invalid INCLUDE"); } catch (SQLException e) { - assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), + assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(), e.getErrorCode()); + assertTrue(e.getMessage().endsWith("abc")); } - String cdc_sql = "CREATE CDC " + cdcName - + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; - conn.createStatement().execute(cdc_sql); + String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + createAndWait(conn, tableName, cdcName, cdc_sql); assertCDCState(conn, cdcName, null, 3); try { @@ -154,43 +160,32 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode()); assertTrue(e.getMessage().endsWith(cdcName)); } + conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName + - "(v2) INCLUDE (pre, post) INDEX_TYPE=g"); + " INCLUDE (pre, post) INDEX_TYPE=g"); cdcName = generateUniqueName(); - conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + - "(v2) INCLUDE (pre, post) INDEX_TYPE=g"); + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + + " INCLUDE (pre, post) INDEX_TYPE=g"; + createAndWait(conn, tableName, cdcName, cdc_sql); assertCDCState(conn, cdcName, "PRE,POST", 3); assertPTable(cdcName, new HashSet<>( Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName); cdcName = generateUniqueName(); - conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + - "(v2) INDEX_TYPE=l"); + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l"; + createAndWait(conn, tableName, cdcName, cdc_sql); assertCDCState(conn, cdcName, null, 2); assertPTable(cdcName, null, tableName); - String viewName = generateUniqueName(); - conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + - tableName); - cdcName = generateUniqueName(); - try { - conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + viewName + - "(PHOENIX_ROW_TIMESTAMP())"); - fail("Expected to fail on VIEW"); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getErrorCode(), - e.getErrorCode()); - assertTrue(e.getMessage().endsWith( - SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getMessage() + " tableType=VIEW")); + // Indexes on views don't support salt buckets and is currently silently ignored. + if (! forView) { + cdcName = generateUniqueName(); + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4"; + createAndWait(conn, tableName, cdcName, cdc_sql); + assertSaltBuckets(cdcName, 4); } - cdcName = generateUniqueName(); - conn.createStatement().execute("CREATE CDC " + cdcName - + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP()) SALT_BUCKETS = 4"); - assertSaltBuckets(cdcName, 4); - conn.close(); } @@ -203,8 +198,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " + "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true"); String cdcName = generateUniqueName(); - conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + - "(PHOENIX_ROW_TIMESTAMP())"); + conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName); PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName)); List<PColumn> idxPkColumns = indexTable.getPKColumns(); @@ -214,7 +208,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName); List<PColumn> cdcPkColumns = cdcTable.getPKColumns(); - assertEquals(" PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString()); + assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString()); assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString()); assertEquals("K", cdcPkColumns.get(2).getName().getString()); } @@ -260,8 +254,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER," + " v2 DATE)"); String cdcName = generateUniqueName(); - String cdc_sql = "CREATE CDC " + cdcName - + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; + String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(cdc_sql); assertCDCState(conn, cdcName, null, 3); String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName; @@ -291,28 +284,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { rs.close(); } + private Connection newConnection() throws SQLException { + Properties props = new Properties(); + // Use these only for debugging. + //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + //props.put("hbase.client.scanner.timeout.period", "6000000"); + //props.put("phoenix.query.timeoutMs", "6000000"); + //props.put("zookeeper.session.timeout", "6000000"); + //props.put("hbase.rpc.timeout", "6000000"); + return DriverManager.getConnection(getUrl(), props); + } + @Test public void testSelectCDC() throws Exception { - Properties props = new Properties(); - props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); - props.put("hbase.client.scanner.timeout.period", "6000000"); - props.put("phoenix.query.timeoutMs", "6000000"); - props.put("zookeeper.session.timeout", "6000000"); - props.put("hbase.rpc.timeout", "6000000"); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = newConnection(); String tableName = generateUniqueName(); conn.createStatement().execute( "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)"); conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)"); conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)"); conn.commit(); - Thread.sleep(1000); + Thread.sleep(10); conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); conn.commit(); String cdcName = generateUniqueName(); String cdc_sql = "CREATE CDC " + cdcName - + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; - conn.createStatement().execute(cdc_sql); + + " ON " + tableName; + createAndWait(conn, tableName, cdcName, cdc_sql); assertCDCState(conn, cdcName, null, 3); // NOTE: To debug the query execution, add the below condition where you need a breakpoint. // if (<table>.getTableName().getString().equals("N000002") || @@ -322,8 +320,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName)); assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()")); - assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " + - "FROM " + cdcName)); + assertResultSet(conn.createStatement().executeQuery("SELECT " + + "/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName)); assertResultSet(conn.createStatement().executeQuery("SELECT " + "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName)); @@ -359,16 +357,79 @@ public class CDCMiscIT extends ParallelStatsDisabledIT { } } + @Test + public void testSelectCDCBadIncludeSpec() throws Exception { + Connection conn = newConnection(); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)"); + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + + " ON " + tableName; + conn.createStatement().execute(cdc_sql); + try { + conn.createStatement().executeQuery("SELECT " + + "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName); + fail("Expected to fail due to invalid CDC INCLUDE hint"); + } + catch (SQLException e) { + assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getMessage().endsWith("DUMMY")); + } + } + + @Test + public void testSelectTimeRangeQueries() throws Exception { + Connection conn = newConnection(); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)"); + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + + " ON " + tableName; + conn.createStatement().execute(cdc_sql); + Timestamp ts1 = new Timestamp(System.currentTimeMillis()); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)"); + conn.commit(); + Thread.sleep(10); + Timestamp ts2 = new Timestamp(System.currentTimeMillis()); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)"); + conn.commit(); + Thread.sleep(10); + Timestamp ts3 = new Timestamp(System.currentTimeMillis()); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); + conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2"); + Timestamp ts4 = new Timestamp(System.currentTimeMillis()); + + String sel_sql = "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND " + + "PHOENIX_ROW_TIMESTAMP() <= ?"; + Object[] testDataSets = new Object[] { + new Object[] {ts1, ts2, new int[] {1, 2}}/*, + new Object[] {ts2, ts3, new int[] {1, 3}}, + new Object[] {ts3, ts4, new int[] {1}}*/ + }; + PreparedStatement stmt = conn.prepareStatement(sel_sql); + for (int i = 0; i < testDataSets.length; ++i) { + Object[] testData = (Object[]) testDataSets[i]; + stmt.setTimestamp(1, (Timestamp) testData[0]); + stmt.setTimestamp(2, (Timestamp) testData[1]); + try (ResultSet rs = stmt.executeQuery()) { + for (int k: (int[]) testData[2]) { + assertEquals(true, rs.next()); + assertEquals(k, rs.getInt(2)); + } + assertEquals(false, rs.next()); + } + } + } + // Temporary test case used as a reference for debugging and comparing against the CDC query. @Test public void testSelectUncoveredIndex() throws Exception { - Properties props = new Properties(); - props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); - props.put("hbase.client.scanner.timeout.period", "6000000"); - props.put("phoenix.query.timeoutMs", "6000000"); - props.put("zookeeper.session.timeout", "6000000"); - props.put("hbase.rpc.timeout", "6000000"); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = newConnection(); String tableName = generateUniqueName(); conn.createStatement().execute( "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)"); diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index cc0a2f10c8..737ef4aabf 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -566,11 +566,10 @@ create_index_node returns [CreateIndexStatement ret] create_cdc_node returns [CreateCDCStatement ret] : CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name - LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN (INCLUDE LPAREN v=cdc_change_scopes RPAREN)? (p=fam_properties)? { - ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount()); + ret = factory.createCDC(o, t, v, p, ex != null, getBindCount()); } ; @@ -578,13 +577,6 @@ cdc_name returns [NamedNode ret] : name=identifier {$ret = factory.cdcName(name); } ; -cdc_time_func returns [FunctionParseNode ret] - : field=identifier LPAREN l=zero_or_more_expressions RPAREN - { - ret = factory.function(field, l); - } - ; - cdc_change_scopes returns [Set<CDCChangeScope> ret] @init { ret = new HashSet<>(); } : v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )* diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 7842e8f4c0..6785a5da28 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -360,7 +360,7 @@ public enum SQLExceptionCode { VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key" + " only if none of the parents have indexes in the parent hierarchy"), UNKNOWN_INDEX_TYPE(1098, "44A39", "Unknown INDEX type: "), - UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for INCLUDE: "), + UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for CDC INCLUDE"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { @@ -469,8 +469,6 @@ public enum SQLExceptionCode { "Missing ENCODED_QUALIFIER."), EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a " + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR), - INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52", - "Invalid table type for creating CDC."), /** diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java index b7babd5dc6..3e27dc75c1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java @@ -65,6 +65,7 @@ public class SQLExceptionInfo { private final int phoenixColumnSizeBytes; private final int maxPhoenixColumnSizeBytes; private final String haGroupInfo; + private final String cdcChangeScope; public static class Builder { private Throwable rootCause; @@ -83,6 +84,7 @@ public class SQLExceptionInfo { private int maxPhoenixColumnSizeBytes; private String haGroupInfo; private PTableType tableType; + private String cdcChangeScope; public Builder(SQLExceptionCode code) { this.code = code; @@ -163,6 +165,11 @@ public class SQLExceptionInfo { return this; } + public Builder setCdcChangeScope(String cdcChangeScope) { + this.cdcChangeScope = cdcChangeScope; + return this; + } + public SQLExceptionInfo build() { return new SQLExceptionInfo(this); } @@ -190,6 +197,7 @@ public class SQLExceptionInfo { maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes; phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes; haGroupInfo = builder.haGroupInfo; + cdcChangeScope = builder.cdcChangeScope; } @Override @@ -235,6 +243,9 @@ public class SQLExceptionInfo { if (haGroupInfo != null) { builder.append(" ").append(HA_GROUP_INFO).append("=").append(haGroupInfo); } + if (cdcChangeScope != null) { + builder.append(": ").append(cdcChangeScope); + } return builder.toString(); } @@ -306,4 +317,8 @@ public class SQLExceptionInfo { public String getHaGroupInfo() { return haGroupInfo; } + + public String getCdcChangeScope() { + return cdcChangeScope; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 002857b213..86f49208f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -676,7 +676,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexWhere = index.getIndexWhereExpression(connection); this.indexWhereColumns = index.getIndexWhereColumns(connection); } - this.isCDCIndex = CDCUtil.isACDCIndex(index); + this.isCDCIndex = CDCUtil.isCDCIndex(index); initCachedState(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 171108f87f..a536c6d016 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -135,7 +135,8 @@ public abstract class RegionScannerFactory { long extraLimit = -1; { - if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) { + // for indexes construct the row filter for uncovered columns if it exists + if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) { byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER); if (expBytes == null) { // For older clients @@ -160,48 +161,45 @@ public abstract class RegionScannerFactory { if (limitBytes != null) { extraLimit = Bytes.toLong(limitBytes); } - if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan) - && (tupleProjector != null - || (indexMaintainer != null && indexMaintainer.isUncovered()))) { + if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan) + && (tupleProjector != null + || (indexMaintainer != null && indexMaintainer.isUncovered()))) { - PTable.ImmutableStorageScheme storageScheme = - indexMaintainer.getIndexStorageScheme(); + PTable.ImmutableStorageScheme storageScheme = + indexMaintainer.getIndexStorageScheme(); Scan dataTableScan = new Scan(); - if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) { - if (dataColumns != null) { + if (dataColumns != null) { for (int i = 0; i < dataColumns.length; i++) { - if (storageScheme == - PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { - dataTableScan.addFamily(dataColumns[i].getFamily()); - } else { - dataTableScan.addColumn(dataColumns[i].getFamily(), - dataColumns[i].getQualifier()); - } + if (storageScheme == + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + dataTableScan.addFamily(dataColumns[i].getFamily()); + } else { + dataTableScan.addColumn(dataColumns[i].getFamily(), + dataColumns[i].getQualifier()); + } } - } else if (indexMaintainer.isUncovered()) { + } else if (indexMaintainer.isUncovered()) { // Indexed columns and the columns in index where clause should also be added // to the data columns to scan for uncovered global indexes. This is required // to verify the index row against the data table row. for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) { - dataTableScan.addColumn(column.getFamily(), column.getQualifier()); + dataTableScan.addColumn(column.getFamily(), column.getQualifier()); } - } } if (ScanUtil.isLocalIndex(scan)) { - s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env, - dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, - pageSizeMs, offset, actualStartKey, extraLimit); - } else { - if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) { - s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, - dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, - pageSizeMs, extraLimit); - } - else { - s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, + s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, - pageSizeMs, extraLimit); - } + pageSizeMs, offset, actualStartKey, extraLimit); + } else { + if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) { + s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, + dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, + pageSizeMs, extraLimit); + } else { + s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, + dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, + pageSizeMs, extraLimit); + } } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 026095a4b7..244e4d14bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -123,7 +123,6 @@ import org.apache.phoenix.log.QueryLoggerUtil; import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.monitoring.TableMetricsManager; import org.apache.phoenix.optimize.Cost; -import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AddJarsStatement; import org.apache.phoenix.parse.AliasedNode; @@ -146,7 +145,6 @@ import org.apache.phoenix.parse.DeclareCursorStatement; import org.apache.phoenix.parse.DeleteJarStatement; import org.apache.phoenix.parse.DeleteStatement; import org.apache.phoenix.parse.ExplainType; -import org.apache.phoenix.parse.FunctionParseNode; import org.apache.phoenix.parse.ShowCreateTableStatement; import org.apache.phoenix.parse.ShowCreateTable; import org.apache.phoenix.parse.DropColumnStatement; @@ -1082,12 +1080,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable private static class ExecutableCreateCDCStatement extends CreateCDCStatement implements CompilableStatement { public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable, - ColumnName timeIdxColumn, FunctionParseNode tfunc, Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String, Pair<String, Object>> props, boolean ifNotExists, int bindCount) { - super(cdcObjName, dataTable, timeIdxColumn, tfunc, includeScopes, props, ifNotExists, - bindCount); + super(cdcObjName, dataTable, includeScopes, props, ifNotExists, bindCount); } @Override @@ -1594,7 +1590,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable @Override public MutationState execute() throws SQLException { String indexName = ExecutableDropIndexStatement.this.getIndexName().getName(); - if (CDCUtil.isACDCIndex(indexName)) { + if (CDCUtil.isCDCIndex(indexName)) { throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX) .setTableName(indexName) .build().buildException(); @@ -1940,11 +1936,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable @Override public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable, - ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc, Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String, Pair<String, Object>> props, boolean ifNotExists, int bindCount) { - return new ExecutableCreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, + return new ExecutableCreateCDCStatement(cdcObj, dataTable, includeScopes, props, ifNotExists, bindCount); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 5d81f0b585..48a5734149 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -220,10 +220,9 @@ public class QueryOptimizer { } PTable table = dataPlan.getTableRef().getTable(); - // TODO: Need to handle CDC hints. if (table.getType() == PTableType.CDC) { Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes(); - String cdcHint = select.getHint().getHint(Hint.INCLUDE); + String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE); if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) { cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1, cdcHint.length() - 1)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java index 9b1468a3ef..5722ab2a20 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java @@ -28,21 +28,16 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap; public class CreateCDCStatement extends MutableStatement { private final NamedNode cdcObjName; private final TableName dataTable; - private final ColumnName timeIdxColumn; - private final FunctionParseNode timeIdxFunc; private final Set<PTable.CDCChangeScope> includeScopes; private final ListMultimap<String,Pair<String,Object>> props; private final boolean ifNotExists; private final int bindCount; - public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, ColumnName timeIdxColumn, - FunctionParseNode timeIdxFunc, + public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String, Pair<String, Object>> props, boolean ifNotExists, int bindCount) { this.cdcObjName = cdcObjName; this.dataTable = dataTable; - this.timeIdxColumn = timeIdxColumn; - this.timeIdxFunc = timeIdxFunc; this.includeScopes = includeScopes; this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props; this.ifNotExists = ifNotExists; @@ -57,14 +52,6 @@ public class CreateCDCStatement extends MutableStatement { return dataTable; } - public ColumnName getTimeIdxColumn() { - return timeIdxColumn; - } - - public FunctionParseNode getTimeIdxFunc() { - return timeIdxFunc; - } - public Set<PTable.CDCChangeScope> getIncludeScopes() { return includeScopes; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index a80b092956..6c1f97b7fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -133,7 +133,7 @@ public class HintNode { /** * Override the default CDC include scopes. */ - INCLUDE, + CDC_INCLUDE, ; }; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 390ae10ade..17ffa6a3f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -377,11 +377,10 @@ public class ParseNodeFactory { } public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable, - ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc, Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String, Pair<String, Object>> props, boolean ifNotExists, int bindCount) { - return new CreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, includeScopes, + return new CreateCDCStatement(cdcObj, dataTable, includeScopes, props, ifNotExists, bindCount); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 76108a6220..9717430a11 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -19,7 +19,6 @@ package org.apache.phoenix.schema; import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE; import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY; -import static org.apache.phoenix.exception.SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC; import static org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME; @@ -161,6 +160,7 @@ import java.util.Set; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction; import org.apache.phoenix.parse.CreateCDCStatement; import org.apache.phoenix.parse.DropCDCStatement; import org.apache.hadoop.hbase.client.Table; @@ -1731,15 +1731,9 @@ public class MetaDataClient { } public MutationState createCDC(CreateCDCStatement statement) throws SQLException { - // TODO: Do we need to borrow the schema name of the table? ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection); TableRef tableRef = resolver.getTables().get(0); PTable dataTable = tableRef.getTable(); - // Check if data table is a view and give a not supported error. - if (dataTable.getType() != TABLE) { - throw new SQLExceptionInfo.Builder(INVALID_TABLE_TYPE_FOR_CDC).setTableType( - dataTable.getType()).build().buildException(); - } Map<String, Object> tableProps = Maps.newHashMapWithExpectedSize( statement.getProps().size()); @@ -1749,12 +1743,9 @@ public class MetaDataClient { NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName( statement.getCdcObjName().getName())); - String timeIdxColName = statement.getTimeIdxColumn() != null ? - statement.getTimeIdxColumn().getColumnName() : null; IndexKeyConstraint indexKeyConstraint = FACTORY.indexKey(Arrays.asList(new Pair[]{Pair.newPair( - timeIdxColName != null ? FACTORY.column(statement.getDataTable(), - timeIdxColName, timeIdxColName) : statement.getTimeIdxFunc(), + FACTORY.function(PhoenixRowTimestampFunction.NAME, Collections.emptyList()), SortOrder.getDefault())})); IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps); ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create(); @@ -1763,13 +1754,10 @@ public class MetaDataClient { TableProperty.SALT_BUCKETS.getPropertyName(), TableProperty.SALT_BUCKETS.getValue(tableProps))); } - // TODO: Transfer TTL and MaxLookback from statement.getProps() to indexProps. CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null, statement.getDataTable(), (Double) null), indexKeyConstraint, null, null, - indexProps, statement.isIfNotExists(), indexType, false, 0, + indexProps, statement.isIfNotExists(), indexType, true, 0, new HashMap<>(), null); - // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to - // protect based on CDCUtil.isACDCIndex(). MutationState indexMutationState; try { // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type, @@ -1782,24 +1770,18 @@ public class MetaDataClient { statement.getCdcObjName().getName()).setRootCause( e).build().buildException(); } - // TODO: What about translating other index creation failures? E.g., bad TS column. throw e; } List<PColumn> pkColumns = dataTable.getPKColumns(); List<ColumnDef> columnDefs = new ArrayList<>(); List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>(); - // TODO: toString() on function will have an extra space at the beginning, but this may - // be OK as I see exactly the same with an index. - ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ? - statement.getTimeIdxColumn() : - FACTORY.columnName(statement.getTimeIdxFunc().toString()); + ColumnName timeIdxCol = FACTORY.columnName(PhoenixRowTimestampFunction.NAME + "()"); columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false, PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false, SortOrder.getDefault(), "", null, false)); pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false)); for (PColumn pcol : pkColumns) { - // TODO: Cross check with the ColumnName creation logic in createIndex (line ~1578). columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()), pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(), pcol.getScale(), false, pcol.getSortOrder(), "", null, false)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 64ab453492..c64de2ef16 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -1104,11 +1104,5 @@ public interface PTable extends PMetaDataEntity { * Include only the post image (state past the change) of the row. */ POST, - - /** - * Include only the latest image of the row. - */ - LATEST, - ; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index 369d349395..681b181283 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -353,7 +353,8 @@ public enum TableProperty { } }, - INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) { + INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, + true, false, false) { @Override public Object getPTableValue(PTable table) { return table.getCDCIncludeScopes(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java index e80fd4ee09..002da0a9c5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -18,9 +18,7 @@ package org.apache.phoenix.util; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.sql.SQLException; import java.util.HashSet; import java.util.Map; import java.util.NavigableSet; @@ -28,15 +26,12 @@ import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.PTable; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES; - public class CDCUtil { public static final String CDC_INDEX_PREFIX = "__CDC__"; public static final String CDC_INDEX_TYPE_LOCAL = "L"; @@ -48,7 +43,8 @@ public class CDCUtil { * @param includeScopes Comma-separated scope names. * @return the set of enums, which can be empty if the string is empty or has no valid names. */ - public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes) { + public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes) + throws SQLException { Set<PTable.CDCChangeScope> cdcChangeScopes = new HashSet<>(); if (includeScopes != null) { StringTokenizer st = new StringTokenizer(includeScopes, ","); @@ -58,7 +54,9 @@ public class CDCUtil { cdcChangeScopes.add(PTable.CDCChangeScope.valueOf(tok.trim().toUpperCase())); } catch (IllegalArgumentException e) { - // Just ignore unrecognized scopes. + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE).setCdcChangeScope( + tok).build().buildException(); } } } @@ -90,12 +88,12 @@ public class CDCUtil { return indexName.substring(CDC_INDEX_PREFIX.length()); } - public static boolean isACDCIndex(String indexName) { + public static boolean isCDCIndex(String indexName) { return indexName.startsWith(CDC_INDEX_PREFIX); } - public static boolean isACDCIndex(PTable indexTable) { - return isACDCIndex(indexTable.getTableName().getString()); + public static boolean isCDCIndex(PTable indexTable) { + return isCDCIndex(indexTable.getTableName().getString()); } public static Scan initForRawScan(Scan scan) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java index 829cb73ed0..71020bb659 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java @@ -497,47 +497,38 @@ public class QueryParserTest { } } - private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists, String tsCol) + private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists) throws Exception { CreateCDCStatement stmt = parseQuery(sql, CreateCDCStatement.class); assertEquals("FOO", stmt.getCdcObjName().getName()); assertEquals("BAR", stmt.getDataTable().getTableName()); - if (tsCol != null) { - assertEquals(tsCol, stmt.getTimeIdxColumn().getColumnName()); - } - else { - assertNull(stmt.getTimeIdxColumn()); - } assertEquals(ifNotExists, stmt.isIfNotExists()); return stmt; } @Test public void testCreateCDCSimple() throws Exception { + parseCreateCDCSimple("create cdc foo on bar", false); + parseCreateCDCSimple("create cdc foo on s.bar", false); + parseCreateCDCSimple("create cdc if not exists foo on bar", true); + parseCreateCDCSimple("create cdc foo on bar index_type=g", false); + parseCreateCDCSimple("create cdc foo on bar index_type=l", false); CreateCDCStatement stmt = null; - parseCreateCDCSimple("create cdc foo on bar(ts)", false, "TS"); - parseCreateCDCSimple("create cdc foo on s.bar(ts)", false, "TS"); - parseCreateCDCSimple("create cdc if not exists foo on bar(ts)", true, "TS"); - parseCreateCDCSimple("create cdc foo on bar(t) index_type=g", false, "T"); - parseCreateCDCSimple("create cdc foo on bar(t) index_type=l", false, "T"); - stmt = parseCreateCDCSimple("create cdc foo on bar(TS_FUNC()) TTL=100, INDEX_TYPE=g", - false, null); - assertEquals("TS_FUNC", stmt.getTimeIdxFunc().getName()); - assertEquals(" TS_FUNC()", stmt.getTimeIdxFunc().toString()); + stmt = parseCreateCDCSimple("create cdc foo on bar TTL=100, INDEX_TYPE=g", false); assertEquals(Arrays.asList(new Pair("TTL", 100), new Pair("INDEX_TYPE", "g")), stmt.getProps().get("")); - stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre)", false, "TS"); + stmt = parseCreateCDCSimple("create cdc foo on bar include (pre)", false); assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)), stmt.getIncludeScopes()); - stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, pre, post)", - false, "TS"); + stmt = parseCreateCDCSimple("create cdc foo on bar include (pre, pre, post)", + false); assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), stmt.getIncludeScopes()); - stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def", - true, "TS"); + stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def", + true); assertEquals(Arrays.asList(new Pair("ABC", "def")), stmt.getProps().get("")); - stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def, prop=val", - true, "TS"); + stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def, prop=val", + true); assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", "val")), stmt.getProps().get("")); } @@ -545,10 +536,7 @@ public class QueryParserTest { @Test public void testCreateCDCWithErrors() throws Exception { parseQueryThatShouldFail("create cdc foo"); - parseQueryThatShouldFail("create cdc foo on bar"); - parseQueryThatShouldFail("create cdc foo on bar(ts integer)"); - parseQueryThatShouldFail("create cdc foo on bar(ts1, ts2)"); - parseQueryThatShouldFail("create cdc foo on bar(ts) include (abc)"); + parseQueryThatShouldFail("create cdc foo on bar include (abc)"); } private void parseInvalidCreateCDC(String sql, int expRrrorCode) throws IOException { @@ -563,7 +551,6 @@ public class QueryParserTest { @Test public void testInvalidCreateCDC() throws Exception { - parseInvalidCreateCDC("create cdc foo on bar", SQLExceptionCode.MISMATCHED_TOKEN.getErrorCode()); parseInvalidCreateCDC("create cdc foo bar", SQLExceptionCode.MISSING_TOKEN.getErrorCode()); parseInvalidCreateCDC("create cdc foo bar ts", SQLExceptionCode.MISSING_TOKEN.getErrorCode()); parseInvalidCreateCDC("create cdc foo bar(ts)", SQLExceptionCode.MISSING_TOKEN.getErrorCode()); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java index e14fed4810..7feb261dc8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java @@ -18,9 +18,11 @@ package org.apache.phoenix.util; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.schema.PTable; import org.junit.Test; +import java.sql.SQLException; import java.util.Arrays; import java.util.HashSet; @@ -38,10 +40,15 @@ public class CDCUtilTest { CDCUtil.makeChangeScopeEnumsFromString("PRE,")); assertEquals(new HashSet<>(Arrays.asList(PRE)), CDCUtil.makeChangeScopeEnumsFromString("PRE, PRE")); - assertEquals(new HashSet<>(Arrays.asList(PRE)), - CDCUtil.makeChangeScopeEnumsFromString("PRE,DUMMY")); - assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)), - CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE,LATEST")); + assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST)), + CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE")); + try { + CDCUtil.makeChangeScopeEnumsFromString("DUMMY"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getMessage().endsWith("DUMMY")); + } } @Test @@ -49,9 +56,7 @@ public class CDCUtilTest { assertEquals(null, CDCUtil.makeChangeScopeStringFromEnums(null)); assertEquals("", CDCUtil.makeChangeScopeStringFromEnums( new HashSet<PTable.CDCChangeScope>())); - assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums( - new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)))); - assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums( - new HashSet<>(Arrays.asList(PRE, LATEST, POST, CHANGE)))); + assertEquals("CHANGE,PRE,POST", CDCUtil.makeChangeScopeStringFromEnums( + new HashSet<>(Arrays.asList(CHANGE, PRE, POST)))); } }