haridsv commented on code in PR #2057:
URL: https://github.com/apache/phoenix/pull/2057#discussion_r1928489141


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -95,52 +101,92 @@ public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocess
                         regionInfoA.getTable());
                 return;
             }
-            // find streamName with ENABLED status
             String tableName = phoenixTable.getName().getString();
-            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
-            pstmt.setString(1, tableName);
-            ResultSet rs = pstmt.executeQuery();
-            if (rs.next()) {
-                String streamName = rs.getString(1);
-                LOGGER.info("Updating partition metadata for table={}, 
stream={} daughters {} {}",
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
+                LOGGER.info("Updating split partition metadata for table={}, 
stream={} daughters {} {}",
                         tableName, streamName, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName());
-                // ancestorIDs = [parentId, grandparentId1]
-                List<String> ancestorIDs = getAncestorIds(conn, tableName, 
streamName, regionInfoA, regionInfoB);
-                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoA);
-                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoB);
-                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs, regionInfoA.getRegionId());
+                // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
+                List<String> ancestorIDs
+                        = getAncestorIdsForSplit(conn, tableName, streamName, 
regionInfoA, regionInfoB);
+
+                upsertDaughterPartitions(conn, tableName, streamName, 
ancestorIDs.subList(0, 1),
+                        Arrays.asList(regionInfoA, regionInfoB));
+
+                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
+                        regionInfoA.getRegionId());
+            } else {
+                LOGGER.info("{} does not have a stream enabled, skipping 
partition metadata update.",
+                        regionInfoA.getTable());
             }
         } catch (SQLException e) {
             LOGGER.error("Unable to update CDC Stream Partition metadata 
during split with daughter regions: {} {}",
                     regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName(), e);
         }
     }
 
-    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
-        PTable pTable;
-        try {
-            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
-        } catch (TableNotFoundException e) {
-            return null;
+    /**
+     * Update parent -> daughter relationship for CDC Streams when regions 
merge.
+     * - upsert partition metadata for the daughter with each parent
+     * - update the end time on all the parents' partition metadata
+     * @param c the environment to interact with the framework and master
+     * @param regionsToMerge parent regions which merged
+     * @param mergedRegion daughter region
+     */
+    @Override
+    public void postCompletedMergeRegionsAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                                final RegionInfo[] 
regionsToMerge,
+                                                final RegionInfo mergedRegion) 
{
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
mergedRegion.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("{} is not a Phoenix Table, skipping partition 
metadata update.",
+                        mergedRegion.getTable());
+                return;
+            }
+            String tableName = phoenixTable.getName().getString();
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
+                LOGGER.info("Updating merged partition metadata for table={}, 
stream={} daughter {}",
+                        tableName, streamName, mergedRegion.getEncodedName());
+                // upsert a row for daughter-parent for each merged region
+                upsertDaughterPartitions(conn, tableName, streamName,
+                        
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
+                        Arrays.asList(mergedRegion));
+
+                // lookup all ancestors of a merged region and update the 
endTime
+                for (RegionInfo ri : regionsToMerge) {
+                    List<String> ancestorIDs = getAncestorIdsForMerge(conn, 
tableName, streamName, ri);
+                    updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
+                            mergedRegion.getRegionId());

Review Comment:
   By looking up the parent of the parent, we are actually updating the grand 
parents right? Is that really the intention? See my other comment visualizing 
the operations and let me know what I am missing.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -60,9 +62,13 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     private static final String PARTITION_UPSERT_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,?,?,?,?,?)";

Review Comment:
   It would be nice to list the column names here to make it easier to read the 
code instead of having to jump to QueryConstants for the name and order of the 
columns.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -173,50 +219,113 @@ private List<String> getAncestorIds(Connection conn, 
String tableName, String st
                     Bytes.toStringBinary(regionInfoB.getStartKey()),
                     Bytes.toStringBinary(regionInfoB.getEndKey())));
         }
+        // if parent was a result of a merge, there will be multiple 
grandparents.
+        while (rs.next()) {
+            ancestorIDs.add(rs.getString(2));
+        }

Review Comment:
   I didn't get the part about grand parents, but then again I must be missing 
something, please see my other comment with visualization and let me know what 
I am missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to