palashc commented on code in PR #2033:
URL: https://github.com/apache/phoenix/pull/2033#discussion_r1872792630
##########
phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java:
##########
@@ -2039,9 +2041,57 @@ public MutationState createCDC(CreateCDCStatement
statement) throws SQLException
createTableInternal(tableStatement, null, dataTable, null, null, null,
null,
null, null, false, null,
null, statement.getIncludeScopes(), tableProps,
commonFamilyProps);
+ // for now, only track stream partition metadata for tables, TODO:
updatable views
+ if (PTableType.TABLE.equals(dataTable.getType())) {
+ updateStreamPartitionMetadata(dataTableFullName);
+ }
return new MutationState(0, 0, connection);
}
+ /**
+ * Trigger CDC Stream Partition metadata bootstrap for the given table in
the background.
+ * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link
CdcStreamPartitionMetadataTask}
+ * to SYSTEM.TASK which updates partition metadata based on table regions.
+ */
+ private void updateStreamPartitionMetadata(String tableName) throws
SQLException {
+ long cdcIndexTimestamp =
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
+ String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+ PreparedStatement ps = connection.prepareStatement(streamStatusSQL);
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName,
cdcIndexTimestamp);
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3, CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
+ ps.executeUpdate();
+ connection.commit();
+
+ try {
+ List<Mutation> sysTaskUpsertMutations =
Task.getMutationsForAddTask(
+ new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(PTable.TaskType.CDC_STREAM_PARTITION)
+ .setTableName(tableName) //give full table name
+ .setSchemaName(streamName) // use schemaName to pass
streamName
+ .build());
+ byte[] rowKey = sysTaskUpsertMutations
+ .get(0).getRow();
+ MetaDataProtocol.MetaDataMutationResult metaDataMutationResult =
+ Task.taskMetaDataCoprocessorExec(connection, rowKey,
+ new
TaskMetaDataServiceCallBack(sysTaskUpsertMutations));
+ if (MetaDataProtocol.MutationCode.UNABLE_TO_UPSERT_TASK.equals(
+ metaDataMutationResult.getMutationCode())) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+ .setSchemaName(SYSTEM_SCHEMA_NAME)
+
.setTableName(SYSTEM_TASK_TABLE).build().buildException();
+ }
+ } catch (IOException ioe) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+ .setRootCause(ioe)
+ .setMessage(ioe.getMessage())
+ .setSchemaName(SYSTEM_SCHEMA_NAME)
+ .setTableName(SYSTEM_TASK_TABLE).build().buildException();
+ }
Review Comment:
@haridsv [PHOENIX-7460](https://issues.apache.org/jira/browse/PHOENIX-7460)
for keeping region metadata in sync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]