palashc commented on code in PR #2057:
URL: https://github.com/apache/phoenix/pull/2057#discussion_r1926517604
##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -95,52 +101,92 @@ public void postCompletedSplitRegionAction(final
ObserverContext<MasterCoprocess
regionInfoA.getTable());
return;
}
- // find streamName with ENABLED status
String tableName = phoenixTable.getName().getString();
- PreparedStatement pstmt =
conn.prepareStatement(STREAM_STATUS_QUERY);
- pstmt.setString(1, tableName);
- ResultSet rs = pstmt.executeQuery();
- if (rs.next()) {
- String streamName = rs.getString(1);
- LOGGER.info("Updating partition metadata for table={},
stream={} daughters {} {}",
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info("Updating split partition metadata for table={},
stream={} daughters {} {}",
tableName, streamName, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName());
- // ancestorIDs = [parentId, grandparentId1]
- List<String> ancestorIDs = getAncestorIds(conn, tableName,
streamName, regionInfoA, regionInfoB);
- upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoA);
- upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoB);
- updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs, regionInfoA.getRegionId());
+ // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
+ List<String> ancestorIDs
+ = getAncestorIdsForSplit(conn, tableName, streamName,
regionInfoA, regionInfoB);
+
+ upsertDaughterPartitions(conn, tableName, streamName,
ancestorIDs.subList(0, 1),
+ Arrays.asList(regionInfoA, regionInfoB));
+
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
+ regionInfoA.getRegionId());
+ } else {
+ LOGGER.info("{} does not have a stream enabled, skipping
partition metadata update.",
+ regionInfoA.getTable());
}
} catch (SQLException e) {
LOGGER.error("Unable to update CDC Stream Partition metadata
during split with daughter regions: {} {}",
regionInfoA.getEncodedName(),
regionInfoB.getEncodedName(), e);
}
}
- private PTable getPhoenixTable(Connection conn, TableName tableName)
throws SQLException {
- PTable pTable;
- try {
- pTable = PhoenixRuntime.getTable(conn, tableName.toString());
- } catch (TableNotFoundException e) {
- return null;
+ /**
+ * Update parent -> daughter relationship for CDC Streams when regions
merge.
+ * - upsert partition metadata for the daughter with each parent
+ * - update the end time on all the parents' partition metadata
+ * @param c the environment to interact with the framework and master
+ * @param regionsToMerge parent regions which merged
+ * @param mergedRegion daughter region
+ */
+ @Override
+ public void postCompletedMergeRegionsAction(final
ObserverContext<MasterCoprocessorEnvironment> c,
+ final RegionInfo[]
regionsToMerge,
+ final RegionInfo mergedRegion)
{
+ Configuration conf = c.getEnvironment().getConfiguration();
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+ // CDC will be enabled on Phoenix tables only
+ PTable phoenixTable = getPhoenixTable(conn,
mergedRegion.getTable());
+ if (phoenixTable == null) {
+ LOGGER.info("{} is not a Phoenix Table, skipping partition
metadata update.",
+ mergedRegion.getTable());
+ return;
+ }
+ String tableName = phoenixTable.getName().getString();
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info("Updating merged partition metadata for table={},
stream={} daughter {}",
+ tableName, streamName, mergedRegion.getEncodedName());
+ // upsert a row for daughter-parent for each merged region
+ upsertDaughterPartitions(conn, tableName, streamName,
+
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
+ Arrays.asList(mergedRegion));
+
+ // lookup all ancestors of a merged region and update the
endTime
+ for (RegionInfo ri : regionsToMerge) {
+ List<String> ancestorIDs = getAncestorIdsForMerge(conn,
tableName, streamName, ri);
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
+ mergedRegion.getRegionId());
Review Comment:
@haridsv During merges, we decided to insert one row for the daughter per
parent. (parent partition id is part of PK)
Parents in a merge can also be results of merges in the past and hence they
can have multiple rows in the table. We would need to update all those rows
with the end time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]