This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e6a471ac PHOENIX-7639 Improve error handling in 
PhoenixMasterObserver (#2191)
e3e6a471ac is described below

commit e3e6a471ac1e5c8342c7cf98cc58b8419a89f9db
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Jun 13 15:54:20 2025 -0700

    PHOENIX-7639 Improve error handling in PhoenixMasterObserver (#2191)
---
 .../MetricsPhoenixCoprocessorSourceFactory.java    |  12 ++
 .../metrics/MetricsPhoenixMasterSource.java        |  60 ++++++
 .../metrics/MetricsPhoenixMasterSourceImpl.java    |  69 +++++++
 .../phoenix/schema/ParentPartitionNotFound.java    |  37 ++++
 .../phoenix/coprocessor/PhoenixMasterObserver.java | 210 ++++++++++++++-------
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java |  97 +++++++++-
 .../org/apache/phoenix/end2end/CDCStream2IT.java   | 120 ++++++++++++
 .../org/apache/phoenix/end2end/CDCStreamIT.java    |  98 +++++-----
 .../phoenix/end2end/TestPhoenixMasterObserver.java |  73 +++++++
 9 files changed, 654 insertions(+), 122 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
index 280a116f68..4009a39906 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
@@ -27,6 +27,7 @@ public class MetricsPhoenixCoprocessorSourceFactory {
     // Holds the PHOENIX_TTL related metrics.
     private static volatile MetricsPhoenixTTLSource phoenixTTLSource;
     private static volatile MetricsMetadataCachingSource metadataCachingSource;
+    private static volatile MetricsPhoenixMasterSource phoenixMasterSource;
 
     public static MetricsPhoenixCoprocessorSourceFactory getInstance() {
         return INSTANCE;
@@ -54,4 +55,15 @@ public class MetricsPhoenixCoprocessorSourceFactory {
         }
         return INSTANCE.metadataCachingSource;
     }
+
+    public MetricsPhoenixMasterSource getPhoenixMasterSource() {
+        if (INSTANCE.phoenixMasterSource == null) {
+            synchronized (MetricsPhoenixMasterSource.class) {
+                if (INSTANCE.phoenixMasterSource == null) {
+                    INSTANCE.phoenixMasterSource = new 
MetricsPhoenixMasterSourceImpl();
+                }
+            }
+        }
+        return INSTANCE.phoenixMasterSource;
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
new file mode 100644
index 0000000000..65d97a9e1a
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessorclient.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+ * PhoenixMasterObserver metrics source.
+ */
+public interface MetricsPhoenixMasterSource extends BaseSource {
+
+    String METRICS_NAME = "PhoenixMasterObserver";
+    String METRICS_CONTEXT = "phoenix";
+    String METRICS_DESCRIPTION = "Metrics about the Phoenix Master 
Coprocessor";
+    String METRICS_JMX_CONTEXT = "Master,sub=" + METRICS_NAME;
+
+    String PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES = 
"phoenixPostSplitPartitionUpdateFailures";
+    String PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES_DESC =
+            "The number of failures during partition metadata updates after 
region splits";
+
+    String PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES = 
"phoenixPostMergePartitionUpdateFailures";
+    String PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES_DESC =
+            "The number of failures during partition metadata updates after 
region merges";
+
+    /**
+     * Return the number of failures during partition metadata updates after 
region splits.
+     */
+    long getPostSplitPartitionUpdateFailureCount();
+
+    /**
+     * Increment the number of failures during partition metadata updates 
after region splits.
+     */
+    void incrementPostSplitPartitionUpdateFailureCount();
+
+    /**
+     * Return the number of failures during partition metadata updates after 
region merges.
+     */
+    long getPostMergePartitionUpdateFailureCount();
+
+    /**
+     * Increment the number of failures during partition metadata updates 
after region merges.
+     */
+    void incrementPostMergePartitionUpdateFailureCount();
+} 
\ No newline at end of file
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
new file mode 100644
index 0000000000..921e56e852
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessorclient.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+/**
+ * Implementation for tracking PhoenixMasterObserver metrics.
+ */
+public class MetricsPhoenixMasterSourceImpl extends BaseSourceImpl
+        implements MetricsPhoenixMasterSource {
+
+    private final MutableFastCounter postSplitPartitionUpdateFailures;
+    private final MutableFastCounter postMergePartitionUpdateFailures;
+
+    public MetricsPhoenixMasterSourceImpl() {
+        this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT);
+    }
+
+    public MetricsPhoenixMasterSourceImpl(String metricsName, String 
metricsDescription,
+            String metricsContext, String metricsJmxContext) {
+        super(metricsName, metricsDescription, metricsContext, 
metricsJmxContext);
+
+        postSplitPartitionUpdateFailures = getMetricsRegistry().newCounter(
+                PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES,
+                PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES_DESC, 0L);
+
+        postMergePartitionUpdateFailures = getMetricsRegistry().newCounter(
+                PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES,
+                PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES_DESC, 0L);
+    }
+
+    @Override
+    public long getPostSplitPartitionUpdateFailureCount() {
+        return postSplitPartitionUpdateFailures.value();
+    }
+
+    @Override
+    public void incrementPostSplitPartitionUpdateFailureCount() {
+        postSplitPartitionUpdateFailures.incr();
+    }
+
+    @Override
+    public long getPostMergePartitionUpdateFailureCount() {
+        return postMergePartitionUpdateFailures.value();
+    }
+
+    @Override
+    public void incrementPostMergePartitionUpdateFailureCount() {
+        postMergePartitionUpdateFailures.incr();
+    }
+} 
\ No newline at end of file
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
new file mode 100644
index 0000000000..4779e920fc
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema;
+
+/**
+ * Exception thrown when a parent partition cannot be found in the CDC stream 
metadata.
+ * This typically occurs during region split or merge operations when trying 
to update
+ * the parent-daughter relationship in the CDC stream metadata.
+ */
+public class ParentPartitionNotFound extends RuntimeException {
+
+    /**
+     * Creates a new ParentPartitionNotFound exception with the specified 
error message.
+     *
+     * @param message the error message.
+     */
+    public ParentPartitionNotFound(String message) {
+        super(message);
+    }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index e335d7604a..c3856f60a3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -37,12 +38,17 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ParentPartitionNotFound;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import java.io.IOException;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
@@ -51,7 +57,9 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
  * Master Coprocessor for Phoenix.
  */
 public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+    private MetricsPhoenixMasterSource metricSource;
 
     private static final String STREAM_STATUS_QUERY
             = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
@@ -74,11 +82,21 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, "
             + "PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
 
+    public static final String PHOENIX_MASTER_MAX_RETRY_COUNT =
+            "phoenix.master.observer.max.retry.count";
+    public static final int DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT = 20;
+
     @Override
     public Optional<MasterObserver> getMasterObserver() {
         return Optional.of(this);
     }
 
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        metricSource =
+                
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
+    }
+
     /**
      * Update parent -> daughter relationship for CDC Streams when a region 
splits.
      * - find parent partition id using start/end keys of daughters
@@ -91,38 +109,66 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     @Override
     public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
                                                final RegionInfo regionInfoA,
-                                               final RegionInfo regionInfoB) {
+                                               final RegionInfo regionInfoB) 
throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
-        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
-            // CDC will be enabled on Phoenix tables only
-            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
-            if (phoenixTable == null) {
-                LOGGER.info("{} is not a Phoenix Table, skipping partition 
metadata update.",
-                        regionInfoA.getTable());
-                return;
-            }
-            String tableName = phoenixTable.getName().getString();
-            String streamName = getStreamName(conn, tableName);
-            if (streamName != null) {
-                LOGGER.info("Updating split partition metadata for table={}, 
stream={} daughters {} {}",
-                        tableName, streamName, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName());
-                // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
-                List<String> ancestorIDs
-                        = getAncestorIdsForSplit(conn, tableName, streamName, 
regionInfoA, regionInfoB);
+        int maxRetryCount =
+                conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT, 
DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT);
+        int tries = 0;
+        Exception caughtException;
+        do {
+            try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+                // CDC will be enabled on Phoenix tables only
+                PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+                if (phoenixTable == null) {
+                    LOGGER.debug("{} is not a Phoenix Table, skipping 
partition metadata update.",
+                            regionInfoA.getTable());
+                    return;
+                }
+                String tableName = phoenixTable.getName().getString();
+                String streamName = getStreamName(conn, tableName);
+                if (streamName != null) {
+                    LOGGER.info(
+                            "Updating split partition metadata for table={}, 
stream={} "
+                                    + "daughters {} {}",
+                            tableName, streamName, 
regionInfoA.getEncodedName(),
+                            regionInfoB.getEncodedName());
+                    // ancestorIDs = [parentId, grandparentId1, 
grandparentId2...]
+                    List<String> ancestorIDs
+                            = getAncestorIdsForSplit(conn, tableName, 
streamName, regionInfoA,
+                            regionInfoB);
 
-                upsertDaughterPartitions(conn, tableName, streamName, 
ancestorIDs.subList(0, 1),
-                        Arrays.asList(regionInfoA, regionInfoB));
+                    upsertDaughterPartitions(conn, tableName, streamName, 
ancestorIDs.subList(0, 1),
+                            Arrays.asList(regionInfoA, regionInfoB));
 
-                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
-                        regionInfoA.getRegionId());
-            } else {
-                LOGGER.info("{} does not have a stream enabled, skipping 
partition metadata update.",
-                        regionInfoA.getTable());
+                    updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
+                            regionInfoA.getRegionId());
+                    return;
+                } else {
+                    LOGGER.debug("{} does not have a stream enabled, skipping 
partition metadata"
+                            + " update.", regionInfoA.getTable());
+                    return;
+                }
+            } catch (ParentPartitionNotFound e) {
+                LOGGER.debug("Parent partition not found, skipping remaining 
steps.", e);
+                return;
+            } catch (TableNotFoundException e) {
+                LOGGER.warn("System Table not found during region split 
completion. "
+                        + "This must happen before metadata upgrade only.", e);
+                return;
+            } catch (Exception e) {
+                LOGGER.error("Try: {}. Unable to update CDC Stream Partition 
metadata during"
+                                + " split with daughter regions: {} {}",
+                        tries, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName(), e);
+                metricSource.incrementPostSplitPartitionUpdateFailureCount();
+                caughtException = e;
             }
-        } catch (SQLException e) {
-            LOGGER.error("Unable to update CDC Stream Partition metadata 
during split with daughter regions: {} {}",
-                    regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName(), e);
-        }
+        } while (tries++ < maxRetryCount);
+        // All retries were exhausted
+        throw new IOException(
+                "Failed to update CDC Stream Partition metadata after " + 
maxRetryCount +
+                        " retries during split. Daughter regions: " +
+                        regionInfoA.getEncodedName() + " " +
+                        regionInfoB.getEncodedName(), caughtException);
     }
 
     /**
@@ -136,41 +182,67 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     @Override
     public void postCompletedMergeRegionsAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
                                                 final RegionInfo[] 
regionsToMerge,
-                                                final RegionInfo mergedRegion) 
{
+                                                final RegionInfo mergedRegion) 
throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
-        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
-            // CDC will be enabled on Phoenix tables only
-            PTable phoenixTable = getPhoenixTable(conn, 
mergedRegion.getTable());
-            if (phoenixTable == null) {
-                LOGGER.info("{} is not a Phoenix Table, skipping partition 
metadata update.",
-                        mergedRegion.getTable());
-                return;
-            }
-            String tableName = phoenixTable.getName().getString();
-            String streamName = getStreamName(conn, tableName);
-            if (streamName != null) {
-                LOGGER.info("Updating merged partition metadata for table={}, 
stream={} daughter {}",
-                        tableName, streamName, mergedRegion.getEncodedName());
-                // upsert a row for daughter-parent for each merged region
-                upsertDaughterPartitions(conn, tableName, streamName,
-                        
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
-                        Arrays.asList(mergedRegion));
+        int maxRetryCount = conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT, 
DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT);
+        int tries = 0;
+        Exception caughtException;
+        do {
+            try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+                // CDC will be enabled on Phoenix tables only
+                PTable phoenixTable = getPhoenixTable(conn, 
mergedRegion.getTable());
+                if (phoenixTable == null) {
+                    LOGGER.debug("{} is not a Phoenix Table, skipping 
partition metadata update.",
+                            mergedRegion.getTable());
+                    return;
+                }
+                String tableName = phoenixTable.getName().getString();
+                String streamName = getStreamName(conn, tableName);
+                if (streamName != null) {
+                    LOGGER.info(
+                            "Updating merged partition metadata for table={}, 
stream={} "
+                                    + "daughter {}",
+                            tableName, streamName, 
mergedRegion.getEncodedName());
+                    // upsert a row for daughter-parent for each merged region
+                    upsertDaughterPartitions(conn, tableName, streamName,
+                            
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
+                                    .collect(Collectors.toList()),
+                            Collections.singletonList(mergedRegion));
 
-                // lookup all ancestors of a merged region and update the 
endTime
-                for (RegionInfo ri : regionsToMerge) {
-                    List<String> ancestorIDs = getAncestorIdsForMerge(conn, 
tableName, streamName, ri);
-                    updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
-                            mergedRegion.getRegionId());
+                    // lookup all ancestors of a merged region and update the 
endTime
+                    for (RegionInfo ri : regionsToMerge) {
+                        List<String> ancestorIDs =
+                                getAncestorIdsForMerge(conn, tableName, 
streamName, ri);
+                        updateParentPartitionEndTime(conn, tableName, 
streamName, ancestorIDs,
+                                mergedRegion.getRegionId());
+                    }
+                    return;
+                } else {
+                    LOGGER.debug("{} does not have a stream enabled, skipping 
partition metadata"
+                                    + " update.", mergedRegion.getTable());
+                    return;
                 }
-            } else {
-                LOGGER.info("{} does not have a stream enabled, skipping 
partition metadata update.",
-                        mergedRegion.getTable());
+            } catch (ParentPartitionNotFound e) {
+                LOGGER.debug("Parent partition not found, skipping remaining 
steps.", e);
+                return;
+            } catch (TableNotFoundException e) {
+                LOGGER.warn("System Table not found during region merge 
completion. "
+                        + "This must happen before metadata upgrade only.", e);
+                return;
+            } catch (Exception e) {
+                LOGGER.error("Try: {}. Unable to update CDC Stream Partition 
metadata during"
+                                + " merge with parent regions: {} and daughter 
region {}",
+                        tries, regionsToMerge, mergedRegion.getEncodedName(), 
e);
+                metricSource.incrementPostMergePartitionUpdateFailureCount();
+                caughtException = e;
             }
-        } catch (SQLException e) {
-            LOGGER.error("Unable to update CDC Stream Partition metadata 
during merge with " +
-                            "parent regions: {} and daughter region {}",
-                    regionsToMerge, mergedRegion.getEncodedName(), e);
-        }
+        } while (tries++ < maxRetryCount);
+        // All retries exhausted
+        throw new IOException(
+                "Failed to update CDC Stream Partition metadata after " + 
maxRetryCount +
+                        " retries during merge with parent regions: " +
+                        Arrays.toString(regionsToMerge) +
+                        " and daughter region: " + 
mergedRegion.getEncodedName(), caughtException);
     }
 
     /**
@@ -180,8 +252,9 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
      * Return parent and all grandparent partition ids.
      *
      */
-    private List<String> getAncestorIdsForSplit(Connection conn, String 
tableName, String streamName,
-                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+    protected List<String> getAncestorIdsForSplit(Connection conn, String 
tableName,
+                                                  String streamName,
+                                                  RegionInfo regionInfoA, 
RegionInfo regionInfoB)
             throws SQLException {
         byte[] parentStartKey = regionInfoA.getStartKey();
         byte[] parentEndKey = regionInfoB.getEndKey();
@@ -212,8 +285,9 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
             ancestorIDs.add(rs.getString(1));
             ancestorIDs.add(rs.getString(2));
         } else {
-            throw new SQLException(String.format("Could not find parent of the 
provided daughters: "
-                            + "startKeyA=%s endKeyA=%s startKeyB=%s 
endKeyB=%s",
+            throw new ParentPartitionNotFound(
+                    String.format("Could not find parent of the provided 
daughters: "
+                                    + "startKeyA=%s endKeyA=%s startKeyB=%s 
endKeyB=%s",
                     Bytes.toStringBinary(regionInfoA.getStartKey()),
                     Bytes.toStringBinary(regionInfoA.getEndKey()),
                     Bytes.toStringBinary(regionInfoB.getStartKey()),
@@ -230,8 +304,9 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
      * Lookup the parent of a merged region.
      * If the merged region was an output of a merge in the past, it will have 
multiple parents.
      */
-    private List<String> getAncestorIdsForMerge(Connection conn, String 
tableName, String streamName,
-                                                RegionInfo parent) throws 
SQLException {
+    protected List<String> getAncestorIdsForMerge(Connection conn, String 
tableName,
+                                                  String streamName,
+                                                  RegionInfo parent) throws 
SQLException {
         List<String> ancestorIDs = new ArrayList<>();
         ancestorIDs.add(parent.getEncodedName());
         PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
@@ -242,8 +317,9 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
         if (rs.next()) {
             ancestorIDs.add(rs.getString(1));
         } else {
-            throw new SQLException(String.format(
-                    "Could not find parent of the provided merged region: {}", 
parent.getEncodedName()));
+            throw new ParentPartitionNotFound(String.format(
+                    "Could not find parent of the provided merged region: %s",
+                    parent.getEncodedName()));
         }
         // if parent was a result of a merge, there will be multiple 
grandparents.
         while (rs.next()) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index eede4b07b7..3fb604d8d0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TTLExpression;
 import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.schema.types.PBinaryBase;
 import org.apache.phoenix.schema.types.PChar;
@@ -47,6 +46,11 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -109,6 +113,8 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     protected ManualEnvironmentEdge injectEdge;
     protected Calendar cal = Calendar.getInstance();
 
+    protected static RegionCoprocessorEnvironment taskRegionEnvironment;
+
     protected void createTable(Connection conn, String table_sql)
             throws Exception {
         createTable(conn, table_sql, null, false, null, false, null);
@@ -1001,4 +1007,93 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             gen.writeEndObject();
         }
     }
+
+    /**
+     * Gets the stream name for a CDC stream.
+     * 
+     * @param conn The connection to use
+     * @param tableName The name of the table
+     * @param cdcName The name of the CDC stream
+     * @return The full stream name
+     * @throws SQLException if an error occurs
+     */
+    public String getStreamName(Connection conn, String tableName, String 
cdcName)
+            throws SQLException {
+        return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
+                CDCUtil.getCDCCreationTimestamp(
+                        
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+    }
+
+    /**
+     * Gets the status of a CDC stream.
+     * 
+     * @param conn The connection to use.
+     * @param tableName The name of the table.
+     * @param streamName The name of the stream.
+     * @return The stream status.
+     * @throws Exception if an error occurs.
+     */
+    public String getStreamStatus(Connection conn, String tableName, String 
streamName)
+            throws Exception {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT 
STREAM_STATUS FROM "
+                + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + 
tableName +
+                "' AND STREAM_NAME='" + streamName + "'");
+        assertTrue(rs.next());
+        return rs.getString(1);
+    }
+
+    /**
+     * Creates a table and enables CDC on it. This method is shared between 
CDCStreamIT and
+     * CDCStream2IT.
+     * 
+     * @param conn The connection to use.
+     * @param tableName The name of the table to create.
+     * @param useTaskRegionObserver Whether to use 
TaskRegionObserver.SelfHealingTask to enable
+     * the stream.
+     * @throws Exception if an error occurs.
+     */
+    public void createTableAndEnableCDC(Connection conn, String tableName,
+                                        boolean useTaskRegionObserver) throws 
Exception {
+        String cdcName = generateUniqueName();
+        String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " (k VARCHAR PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 VARCHAR)");
+        createCDC(conn, cdcSql, null);
+        String streamName = getStreamName(conn, tableName, cdcName);
+        if (useTaskRegionObserver) {
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            taskRegionEnvironment,
+                            
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+            task.run();
+        } else {
+            while (!CDCUtil.CdcStreamStatus.ENABLED.toString()
+                    .equals(getStreamStatus(conn, tableName, streamName))) {
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    /**
+     * Partition Metadata class.
+     */
+    public static class PartitionMetadata {
+        public String partitionId;
+        public String parentPartitionId;
+        public Long startTime;
+        public Long endTime;
+        public byte[] startKey;
+        public byte[] endKey;
+
+        public PartitionMetadata(ResultSet rs) throws Exception {
+            partitionId = rs.getString(3);
+            parentPartitionId = rs.getString(4);
+            startTime = rs.getLong(5);
+            endTime = rs.getLong(6);
+            startKey = rs.getBytes(7);
+            endKey = rs.getBytes(8);
+        }
+    }
+
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java
new file mode 100644
index 0000000000..50848a0091
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class CDCStream2IT extends CDCBaseIT {
+
+    private static final MetricsPhoenixMasterSource METRICS_SOURCE =
+            
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+                Integer.toString(60 * 60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
+        props.put("hbase.coprocessor.master.classes", 
TestPhoenixMasterObserver.class.getName());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testPartitionMetadataWithRetries() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName, false);
+
+        assertEquals("Post split partition update failures should be 0 
initially",
+                0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+        assertEquals("Post merge partition update failures should be 0 
initially",
+                0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+        // Perform a split operation - this will fail 24 times before 
succeeding
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
+
+        // Verify split metric is 24
+        assertEquals("Post split partition update failures should be 24 after 
retries",
+                24, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+
+        // Perform a merge operation - this will fail 15 times before 
succeeding
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        // Verify merge metric is 15
+        assertEquals("Post merge partition update failures should be 15 after 
retries",
+                15, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+        List<PartitionMetadata> splitParents = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey == null && pm.endKey == null && pm.endTime == 0) {
+                mergedDaughter.add(pm);
+            }
+            if (pm.startKey != null || pm.endKey != null) {
+                splitParents.add(pm);
+            }
+        }
+        assertEquals(2, mergedDaughter.size());
+        assertEquals(2, splitParents.size());
+        assertEquals(mergedDaughter.get(0).startTime, 
mergedDaughter.get(1).startTime);
+        assertEquals(mergedDaughter.get(0).endTime, 
mergedDaughter.get(1).endTime);
+        assertEquals(mergedDaughter.get(0).partitionId, 
mergedDaughter.get(1).partitionId);
+        assertTrue(mergedDaughter.stream().anyMatch(
+                d -> Objects.equals(d.parentPartitionId, 
splitParents.get(0).partitionId)));
+        assertTrue(mergedDaughter.stream().anyMatch(
+                d -> Objects.equals(d.parentPartitionId, 
splitParents.get(1).partitionId)));
+        for (PartitionMetadata splitDaughter : splitParents) {
+            Assert.assertEquals(mergedDaughter.get(0).startTime, 
splitDaughter.endTime);
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 6b97d118d6..72f6b50b4b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -21,7 +21,6 @@ package org.apache.phoenix.end2end;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
@@ -39,6 +38,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -57,15 +58,16 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
 import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
-import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
 public class CDCStreamIT extends CDCBaseIT {
-    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final MetricsPhoenixMasterSource METRICS_SOURCE =
+            
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -78,7 +80,7 @@ public class CDCStreamIT extends CDCBaseIT {
                 Long.toString(Long.MAX_VALUE));
         props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-        TaskRegionEnvironment =
+        taskRegionEnvironment =
                 getUtility()
                         .getRSForFirstRegionInTable(
                                 
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
@@ -105,7 +107,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // run task to populate partitions and enable stream
         TaskRegionObserver.SelfHealingTask task =
                 new TaskRegionObserver.SelfHealingTask(
-                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+                        taskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
         task.run();
 
         // stream should be in ENABLED state and metadata is populated for 
every table region
@@ -140,7 +142,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // run task to populate partitions and enable stream
         TaskRegionObserver.SelfHealingTask task =
                 new TaskRegionObserver.SelfHealingTask(
-                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+                        taskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
         task.run();
 
         // stream exists in ENABLED status
@@ -173,7 +175,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region somewhere in the middle
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
@@ -210,7 +212,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region [null, null]
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -254,7 +256,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region [null, null]
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -298,7 +300,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region [null, null]
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
@@ -336,7 +338,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
@@ -386,7 +388,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         //split the only region
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -433,7 +435,7 @@ public class CDCStreamIT extends CDCBaseIT {
         // create table, cdc and bootstrap stream metadata
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         // split the only region
         TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -488,7 +490,7 @@ public class CDCStreamIT extends CDCBaseIT {
     public void testGetRecords() throws Exception {
         Connection conn = newConnection();
         String tableName = generateUniqueName();
-        createTableAndEnableCDC(conn, tableName);
+        createTableAndEnableCDC(conn, tableName, true);
 
         // upsert data
         conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('a', 1, 'foo')");
@@ -530,9 +532,33 @@ public class CDCStreamIT extends CDCBaseIT {
         }
     }
 
-    private String getStreamName(Connection conn, String tableName, String 
cdcName) throws SQLException {
-        return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, 
CDCUtil.getCDCCreationTimestamp(
-                
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+    @Test
+    public void testPartitionUpdateFailureMetrics() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName, true);
+
+        assertEquals("Post split partition update failures should be 0 
initially",
+            0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+        assertEquals("Post merge partition update failures should be 0 
initially", 
+            0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
+
+        // Verify split metric is still 0 (successful split)
+        assertEquals("Post split partition update failures should be 0 after 
successful split",
+            0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+        
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        // Verify merge metric is still 0 (successful merge)
+        assertEquals("Post merge partition update failures should be 0 after 
successful merge", 
+            0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
     }
 
     private void assertStreamStatus(Connection conn, String tableName, String 
streamName,
@@ -546,8 +572,7 @@ public class CDCStreamIT extends CDCBaseIT {
 
     private void assertPartitionMetadata(Connection conn, String tableName, 
String cdcName)
             throws SQLException {
-        String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, 
cdcName,
-                
CDCUtil.getCDCCreationTimestamp(conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+        String streamName = getStreamName(conn, tableName, cdcName);
         List<HRegionLocation> tableRegions
                 = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAllTableRegions(tableName.getBytes());
         for (HRegionLocation tableRegion : tableRegions) {
@@ -562,39 +587,4 @@ public class CDCStreamIT extends CDCBaseIT {
         }
     }
 
-    private void createTableAndEnableCDC(Connection conn, String tableName) 
throws Exception {
-        String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k VARCHAR PRIMARY KEY," + " 
v1 INTEGER,"
-                        + " v2 VARCHAR)");
-        createCDC(conn, cdc_sql, null);
-        String streamName = getStreamName(conn, tableName, cdcName);
-        TaskRegionObserver.SelfHealingTask task =
-                new TaskRegionObserver.SelfHealingTask(
-                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
-        task.run();
-        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLED);
-    }
-
-    /**
-     * Inner class to represent partition metadata for a region i.e. single 
row from SYSTEM.CDC_STREAM
-     */
-    private class PartitionMetadata {
-        public String partitionId;
-        public String parentPartitionId;
-        public Long startTime;
-        public Long endTime;
-        public byte[] startKey;
-        public byte[] endKey;
-
-        public PartitionMetadata(ResultSet rs) throws SQLException {
-            partitionId = rs.getString(3);
-            parentPartitionId = rs.getString(4);
-            startTime = rs.getLong(5);
-            endTime = rs.getLong(6);
-            startKey = rs.getBytes(7);
-            endKey = rs.getBytes(8);
-        }
-    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
new file mode 100644
index 0000000000..91f5c08e0e
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Test implementation of PhoenixMasterObserver that simulates failures for a 
specific number
+ * of retries before succeeding.
+ */
+public class TestPhoenixMasterObserver extends PhoenixMasterObserver {
+
+    private int splitFailureCount;
+    private int mergeFailureCount;
+
+    private static final int SPLIT_FAILURE_THRESHOLD = 24;
+    private static final int MERGE_FAILURE_THRESHOLD = 15;
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        super.start(env);
+        splitFailureCount = 0;
+        mergeFailureCount = 0;
+    }
+
+    @Override
+    protected List<String> getAncestorIdsForSplit(Connection conn, String 
tableName,
+                                                  String streamName, 
RegionInfo regionInfoA,
+                                                  RegionInfo regionInfoB) 
throws SQLException {
+        if (splitFailureCount < SPLIT_FAILURE_THRESHOLD) {
+            splitFailureCount++;
+            throw new SQLException(
+                    "Test failure for split operation, attempt " + 
splitFailureCount);
+        }
+        return super.getAncestorIdsForSplit(conn, tableName, streamName, 
regionInfoA, regionInfoB);
+    }
+
+    @Override
+    protected List<String> getAncestorIdsForMerge(Connection conn, String 
tableName,
+                                                  String streamName, 
RegionInfo parent)
+            throws SQLException {
+        if (mergeFailureCount < MERGE_FAILURE_THRESHOLD) {
+            mergeFailureCount++;
+            throw new SQLException(
+                    "Test failure for merge operation, attempt " + 
mergeFailureCount);
+        }
+        return super.getAncestorIdsForMerge(conn, tableName, streamName, 
parent);
+    }
+
+}
\ No newline at end of file

Reply via email to