haridsv commented on code in PR #2033: URL: https://github.com/apache/phoenix/pull/2033#discussion_r1879672456
########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.coprocessor.tasks; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.ServerTask; +import org.apache.phoenix.schema.task.SystemTaskParams; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.QueryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME; +import static org.apache.phoenix.query.QueryServices.PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT; + +/** + * Task to bootstrap partition metadata when CDC is enabled on a table. + * Upserts one row for each region of the table into SYSTEM.CDC_STREAM and marks the status as + * ENABLED in SYSTEM.CDC_STREAM_STATUS. + */ +public class CdcStreamPartitionMetadataTask extends BaseTask { + + public static final Logger LOGGER = LoggerFactory.getLogger(CdcStreamPartitionMetadataTask.class); + private static final String CDC_STREAM_STATUS_UPSERT_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + + // parent_partition_id will be null, set partition_end_time to -1 + private static final String CDC_STREAM_PARTITION_UPSERT_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,null,?,-1,?,?)"; + + @Override + public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + Configuration configuration = HBaseConfiguration.addHbaseResources(conf); + int getTableRegionsTimeout = configuration.getInt(PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT, + DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT); + PhoenixConnection pconn = null; + String tableName = taskRecord.getTableName(); + String streamName = taskRecord.getSchemaName(); + Timestamp timestamp = taskRecord.getTimeStamp(); + try { + pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + List<HRegionLocation> tableRegions = pconn.getQueryServices().getAllTableRegions( + tableName.getBytes(), getTableRegionsTimeout); Review Comment: If there is a timeout due to the table having a large number of regions, it may continue to timeout even in retries. How about we query for the PARTITION_END_KEY of the last partition and use `getTableRegions` to start from that key? We could load any existing partition records for this stream first (which will be empty the first time so we will default to `HConstants.EMPTY_START_ROW`), sort them and get the highest end key. The end key for `getTableRegions` would also be `HConstants.EMPTY_START_ROW`. ########## phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java: ########## @@ -2032,16 +2038,81 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException tableProps.put(TableProperty.MULTI_TENANT.getPropertyName(), Boolean.TRUE); } CreateTableStatement tableStatement = FACTORY.createTable( - FACTORY.table(dataTable.getSchemaName().getString(), statement.getCdcObjName().getName()), + FACTORY.table(dataTable.getSchemaName().getString(), cdcObjName), null, columnDefs, FACTORY.primaryKey(null, pkColumnDefs), Collections.emptyList(), PTableType.CDC, statement.isIfNotExists(), null, null, statement.getBindCount(), null); createTableInternal(tableStatement, null, dataTable, null, null, null, null, null, null, false, null, null, statement.getIncludeScopes(), tableProps, commonFamilyProps); + // for now, only track stream partition metadata for tables, TODO: updatable views + if (PTableType.TABLE == dataTable.getType()) { + updateStreamPartitionMetadata(dataTableFullName, cdcObjName); + } return new MutationState(0, 0, connection); } + /** + * Trigger CDC Stream Partition metadata bootstrap for the given table in the background. + * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link CdcStreamPartitionMetadataTask} + * to SYSTEM.TASK which updates partition metadata based on table regions. + */ + private void updateStreamPartitionMetadata(String tableName, String cdcObjName) throws SQLException { + // create Stream with ENABLING status + long cdcIndexTimestamp = CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName)); + String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcObjName, cdcIndexTimestamp); + try (PreparedStatement ps = connection.prepareStatement(streamStatusSQL)) { + ps.setString(1, tableName); + ps.setString(2, streamName); + ps.setString(3, CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue()); + ps.executeUpdate(); + connection.commit(); + LOGGER.info("Marked stream {} for table {} as ENABLING", streamName, tableName); + } + + // insert task to update partition metadata for stream + try { + List<Mutation> sysTaskUpsertMutations = Task.getMutationsForAddTask( + new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connection) + .setTaskType(PTable.TaskType.CDC_STREAM_PARTITION) + .setTableName(tableName) //give full table name + .setSchemaName(streamName) // use schemaName to pass streamName + .build()); Review Comment: Shouldn't we set the status to CREATED? ########## phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java: ########## @@ -3942,11 +4013,22 @@ public MutationState dropCDC(DropCDCStatement statement) throws SQLException { String schemaName = statement.getTableName().getSchemaName(); String cdcTableName = statement.getCdcObjName().getName(); String parentTableName = statement.getTableName().getTableName(); + String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()); + // Mark CDC Stream as Disabled + long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp(); + String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + String streamName = String.format(CDC_STREAM_NAME_FORMAT, parentTableName, cdcTableName, cdcIndexTimestamp); + try (PreparedStatement ps = connection.prepareStatement(streamStatusSQL)) { + ps.setString(1, parentTableName); + ps.setString(2, streamName); + ps.setString(3, CDCUtil.CdcStreamStatus.DISABLED.getSerializedValue()); + ps.executeUpdate(); + connection.commit(); + LOGGER.info("Marked stream {} for table {} as DISABLED", streamName, parentTableName); + } Review Comment: We should do this after the CDC drop is successful, may be even after the index drop. ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java: ########## @@ -150,4 +154,30 @@ public static boolean isBinaryType(PDataType dataType) { || sqlType == Types.LONGVARBINARY || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE); } + + public enum CdcStreamStatus { + ENABLED("ENABLED"), + ENABLING("ENABLING"), + DISABLED("DISABLED"), + DISABLING("DISABLING"); + + private final String serializedValue; + + private CdcStreamStatus(String value) { + this.serializedValue = value; + } + + public String getSerializedValue() { + return serializedValue; + } + } + + public static long getCDCCreationTimestamp(PTable table) { + for (PTable index : table.getIndexes()) { + if (CDCUtil.isCDCIndex(index)) { + return index.getTimeStamp(); + } + } Review Comment: This method is misleading, it is returning the timestamp of the first index it finds. Since currently we can have multiple CDC indexes, it may return different value at different times. In theory, this can make `updateStreamPartitionMetadata` reuse a previous stream name instead of assigning a new one and `dropCDC` try to update the status for a wrong stream. I am thinking, we can do the following as a quick workaround: 1. We can change `CDC_STREAM_NAME_FORMAT` to just "<table name>-UUID"` and avoid depending on this method in `updateStreamPartitionMetadata`. 2. In `dropCDC`, query for the name of the ENABLED/ENABLING stream to change its status, rather than constructing the name from `CDC_STREAM_NAME_FORMAT`. 3. In the ITs, we would assert on the existence of a stream of specific status rather than on the specific name. ########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.coprocessor.tasks; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.ServerTask; +import org.apache.phoenix.schema.task.SystemTaskParams; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.QueryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME; +import static org.apache.phoenix.query.QueryServices.PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT; + +/** + * Task to bootstrap partition metadata when CDC is enabled on a table. + * Upserts one row for each region of the table into SYSTEM.CDC_STREAM and marks the status as + * ENABLED in SYSTEM.CDC_STREAM_STATUS. + */ +public class CdcStreamPartitionMetadataTask extends BaseTask { + + public static final Logger LOGGER = LoggerFactory.getLogger(CdcStreamPartitionMetadataTask.class); + private static final String CDC_STREAM_STATUS_UPSERT_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + + // parent_partition_id will be null, set partition_end_time to -1 + private static final String CDC_STREAM_PARTITION_UPSERT_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,null,?,-1,?,?)"; + + @Override + public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + Configuration configuration = HBaseConfiguration.addHbaseResources(conf); + int getTableRegionsTimeout = configuration.getInt(PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT, + DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT); + PhoenixConnection pconn = null; + String tableName = taskRecord.getTableName(); + String streamName = taskRecord.getSchemaName(); + Timestamp timestamp = taskRecord.getTimeStamp(); + try { + pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + List<HRegionLocation> tableRegions = pconn.getQueryServices().getAllTableRegions( + tableName.getBytes(), getTableRegionsTimeout); + upsertPartitionMetadata(pconn, tableName, streamName, tableRegions); + updateStreamStatus(pconn, tableName, streamName); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, ""); + } catch (SQLException e) { + try { + // Update task status to RETRY so that it is retried + ServerTask.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(pconn) + .setTaskType(taskRecord.getTaskType()) + .setSchemaName(taskRecord.getSchemaName()) + .setTableName(taskRecord.getTableName()) + .setTaskStatus(PTable.TaskStatus.RETRY.toString()) + .setStartTs(taskRecord.getTimeStamp()) + .setEndTs(null) + .build()); + LOGGER.error("Marking task as RETRY. " + Review Comment: Shouldn't this be a warning, since we are not giving up yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
