This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new e3e6a471ac PHOENIX-7639 Improve error handling in
PhoenixMasterObserver (#2191)
e3e6a471ac is described below
commit e3e6a471ac1e5c8342c7cf98cc58b8419a89f9db
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Jun 13 15:54:20 2025 -0700
PHOENIX-7639 Improve error handling in PhoenixMasterObserver (#2191)
---
.../MetricsPhoenixCoprocessorSourceFactory.java | 12 ++
.../metrics/MetricsPhoenixMasterSource.java | 60 ++++++
.../metrics/MetricsPhoenixMasterSourceImpl.java | 69 +++++++
.../phoenix/schema/ParentPartitionNotFound.java | 37 ++++
.../phoenix/coprocessor/PhoenixMasterObserver.java | 210 ++++++++++++++-------
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 97 +++++++++-
.../org/apache/phoenix/end2end/CDCStream2IT.java | 120 ++++++++++++
.../org/apache/phoenix/end2end/CDCStreamIT.java | 98 +++++-----
.../phoenix/end2end/TestPhoenixMasterObserver.java | 73 +++++++
9 files changed, 654 insertions(+), 122 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
index 280a116f68..4009a39906 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixCoprocessorSourceFactory.java
@@ -27,6 +27,7 @@ public class MetricsPhoenixCoprocessorSourceFactory {
// Holds the PHOENIX_TTL related metrics.
private static volatile MetricsPhoenixTTLSource phoenixTTLSource;
private static volatile MetricsMetadataCachingSource metadataCachingSource;
+ private static volatile MetricsPhoenixMasterSource phoenixMasterSource;
public static MetricsPhoenixCoprocessorSourceFactory getInstance() {
return INSTANCE;
@@ -54,4 +55,15 @@ public class MetricsPhoenixCoprocessorSourceFactory {
}
return INSTANCE.metadataCachingSource;
}
+
+ public MetricsPhoenixMasterSource getPhoenixMasterSource() {
+ if (INSTANCE.phoenixMasterSource == null) {
+ synchronized (MetricsPhoenixMasterSource.class) {
+ if (INSTANCE.phoenixMasterSource == null) {
+ INSTANCE.phoenixMasterSource = new
MetricsPhoenixMasterSourceImpl();
+ }
+ }
+ }
+ return INSTANCE.phoenixMasterSource;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
new file mode 100644
index 0000000000..65d97a9e1a
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessorclient.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+ * PhoenixMasterObserver metrics source.
+ */
+public interface MetricsPhoenixMasterSource extends BaseSource {
+
+ String METRICS_NAME = "PhoenixMasterObserver";
+ String METRICS_CONTEXT = "phoenix";
+ String METRICS_DESCRIPTION = "Metrics about the Phoenix Master
Coprocessor";
+ String METRICS_JMX_CONTEXT = "Master,sub=" + METRICS_NAME;
+
+ String PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES =
"phoenixPostSplitPartitionUpdateFailures";
+ String PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES_DESC =
+ "The number of failures during partition metadata updates after
region splits";
+
+ String PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES =
"phoenixPostMergePartitionUpdateFailures";
+ String PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES_DESC =
+ "The number of failures during partition metadata updates after
region merges";
+
+ /**
+ * Return the number of failures during partition metadata updates after
region splits.
+ */
+ long getPostSplitPartitionUpdateFailureCount();
+
+ /**
+ * Increment the number of failures during partition metadata updates
after region splits.
+ */
+ void incrementPostSplitPartitionUpdateFailureCount();
+
+ /**
+ * Return the number of failures during partition metadata updates after
region merges.
+ */
+ long getPostMergePartitionUpdateFailureCount();
+
+ /**
+ * Increment the number of failures during partition metadata updates
after region merges.
+ */
+ void incrementPostMergePartitionUpdateFailureCount();
+}
\ No newline at end of file
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
new file mode 100644
index 0000000000..921e56e852
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/metrics/MetricsPhoenixMasterSourceImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessorclient.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+/**
+ * Implementation for tracking PhoenixMasterObserver metrics.
+ */
+public class MetricsPhoenixMasterSourceImpl extends BaseSourceImpl
+ implements MetricsPhoenixMasterSource {
+
+ private final MutableFastCounter postSplitPartitionUpdateFailures;
+ private final MutableFastCounter postMergePartitionUpdateFailures;
+
+ public MetricsPhoenixMasterSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsPhoenixMasterSourceImpl(String metricsName, String
metricsDescription,
+ String metricsContext, String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext,
metricsJmxContext);
+
+ postSplitPartitionUpdateFailures = getMetricsRegistry().newCounter(
+ PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES,
+ PHOENIX_POST_SPLIT_PARTITION_UPDATE_FAILURES_DESC, 0L);
+
+ postMergePartitionUpdateFailures = getMetricsRegistry().newCounter(
+ PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES,
+ PHOENIX_POST_MERGE_PARTITION_UPDATE_FAILURES_DESC, 0L);
+ }
+
+ @Override
+ public long getPostSplitPartitionUpdateFailureCount() {
+ return postSplitPartitionUpdateFailures.value();
+ }
+
+ @Override
+ public void incrementPostSplitPartitionUpdateFailureCount() {
+ postSplitPartitionUpdateFailures.incr();
+ }
+
+ @Override
+ public long getPostMergePartitionUpdateFailureCount() {
+ return postMergePartitionUpdateFailures.value();
+ }
+
+ @Override
+ public void incrementPostMergePartitionUpdateFailureCount() {
+ postMergePartitionUpdateFailures.incr();
+ }
+}
\ No newline at end of file
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
new file mode 100644
index 0000000000..4779e920fc
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ParentPartitionNotFound.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema;
+
+/**
+ * Exception thrown when a parent partition cannot be found in the CDC stream
metadata.
+ * This typically occurs during region split or merge operations when trying
to update
+ * the parent-daughter relationship in the CDC stream metadata.
+ */
+public class ParentPartitionNotFound extends RuntimeException {
+
+ /**
+ * Creates a new ParentPartitionNotFound exception with the specified
error message.
+ *
+ * @param message the error message.
+ */
+ public ParentPartitionNotFound(String message) {
+ super(message);
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index e335d7604a..c3856f60a3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -37,12 +38,17 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ParentPartitionNotFound;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import java.io.IOException;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
@@ -51,7 +57,9 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
* Master Coprocessor for Phoenix.
*/
public class PhoenixMasterObserver implements MasterObserver,
MasterCoprocessor {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+ private MetricsPhoenixMasterSource metricSource;
private static final String STREAM_STATUS_QUERY
= "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
@@ -74,11 +82,21 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME,
STREAM_NAME, PARTITION_ID, "
+ "PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
+ public static final String PHOENIX_MASTER_MAX_RETRY_COUNT =
+ "phoenix.master.observer.max.retry.count";
+ public static final int DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT = 20;
+
@Override
public Optional<MasterObserver> getMasterObserver() {
return Optional.of(this);
}
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ metricSource =
+
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
+ }
+
/**
* Update parent -> daughter relationship for CDC Streams when a region
splits.
* - find parent partition id using start/end keys of daughters
@@ -91,38 +109,66 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
@Override
public void postCompletedSplitRegionAction(final
ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo regionInfoA,
- final RegionInfo regionInfoB) {
+ final RegionInfo regionInfoB)
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
- try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
- // CDC will be enabled on Phoenix tables only
- PTable phoenixTable = getPhoenixTable(conn,
regionInfoA.getTable());
- if (phoenixTable == null) {
- LOGGER.info("{} is not a Phoenix Table, skipping partition
metadata update.",
- regionInfoA.getTable());
- return;
- }
- String tableName = phoenixTable.getName().getString();
- String streamName = getStreamName(conn, tableName);
- if (streamName != null) {
- LOGGER.info("Updating split partition metadata for table={},
stream={} daughters {} {}",
- tableName, streamName, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName());
- // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
- List<String> ancestorIDs
- = getAncestorIdsForSplit(conn, tableName, streamName,
regionInfoA, regionInfoB);
+ int maxRetryCount =
+ conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT,
DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT);
+ int tries = 0;
+ Exception caughtException;
+ do {
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+ // CDC will be enabled on Phoenix tables only
+ PTable phoenixTable = getPhoenixTable(conn,
regionInfoA.getTable());
+ if (phoenixTable == null) {
+ LOGGER.debug("{} is not a Phoenix Table, skipping
partition metadata update.",
+ regionInfoA.getTable());
+ return;
+ }
+ String tableName = phoenixTable.getName().getString();
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info(
+ "Updating split partition metadata for table={},
stream={} "
+ + "daughters {} {}",
+ tableName, streamName,
regionInfoA.getEncodedName(),
+ regionInfoB.getEncodedName());
+ // ancestorIDs = [parentId, grandparentId1,
grandparentId2...]
+ List<String> ancestorIDs
+ = getAncestorIdsForSplit(conn, tableName,
streamName, regionInfoA,
+ regionInfoB);
- upsertDaughterPartitions(conn, tableName, streamName,
ancestorIDs.subList(0, 1),
- Arrays.asList(regionInfoA, regionInfoB));
+ upsertDaughterPartitions(conn, tableName, streamName,
ancestorIDs.subList(0, 1),
+ Arrays.asList(regionInfoA, regionInfoB));
- updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
- regionInfoA.getRegionId());
- } else {
- LOGGER.info("{} does not have a stream enabled, skipping
partition metadata update.",
- regionInfoA.getTable());
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
+ regionInfoA.getRegionId());
+ return;
+ } else {
+ LOGGER.debug("{} does not have a stream enabled, skipping
partition metadata"
+ + " update.", regionInfoA.getTable());
+ return;
+ }
+ } catch (ParentPartitionNotFound e) {
+ LOGGER.debug("Parent partition not found, skipping remaining
steps.", e);
+ return;
+ } catch (TableNotFoundException e) {
+ LOGGER.warn("System Table not found during region split
completion. "
+ + "This must happen before metadata upgrade only.", e);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Try: {}. Unable to update CDC Stream Partition
metadata during"
+ + " split with daughter regions: {} {}",
+ tries, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName(), e);
+ metricSource.incrementPostSplitPartitionUpdateFailureCount();
+ caughtException = e;
}
- } catch (SQLException e) {
- LOGGER.error("Unable to update CDC Stream Partition metadata
during split with daughter regions: {} {}",
- regionInfoA.getEncodedName(),
regionInfoB.getEncodedName(), e);
- }
+ } while (tries++ < maxRetryCount);
+ // All retries were exhausted
+ throw new IOException(
+ "Failed to update CDC Stream Partition metadata after " +
maxRetryCount +
+ " retries during split. Daughter regions: " +
+ regionInfoA.getEncodedName() + " " +
+ regionInfoB.getEncodedName(), caughtException);
}
/**
@@ -136,41 +182,67 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
@Override
public void postCompletedMergeRegionsAction(final
ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo[]
regionsToMerge,
- final RegionInfo mergedRegion)
{
+ final RegionInfo mergedRegion)
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
- try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
- // CDC will be enabled on Phoenix tables only
- PTable phoenixTable = getPhoenixTable(conn,
mergedRegion.getTable());
- if (phoenixTable == null) {
- LOGGER.info("{} is not a Phoenix Table, skipping partition
metadata update.",
- mergedRegion.getTable());
- return;
- }
- String tableName = phoenixTable.getName().getString();
- String streamName = getStreamName(conn, tableName);
- if (streamName != null) {
- LOGGER.info("Updating merged partition metadata for table={},
stream={} daughter {}",
- tableName, streamName, mergedRegion.getEncodedName());
- // upsert a row for daughter-parent for each merged region
- upsertDaughterPartitions(conn, tableName, streamName,
-
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
- Arrays.asList(mergedRegion));
+ int maxRetryCount = conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT,
DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT);
+ int tries = 0;
+ Exception caughtException;
+ do {
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+ // CDC will be enabled on Phoenix tables only
+ PTable phoenixTable = getPhoenixTable(conn,
mergedRegion.getTable());
+ if (phoenixTable == null) {
+ LOGGER.debug("{} is not a Phoenix Table, skipping
partition metadata update.",
+ mergedRegion.getTable());
+ return;
+ }
+ String tableName = phoenixTable.getName().getString();
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info(
+ "Updating merged partition metadata for table={},
stream={} "
+ + "daughter {}",
+ tableName, streamName,
mergedRegion.getEncodedName());
+ // upsert a row for daughter-parent for each merged region
+ upsertDaughterPartitions(conn, tableName, streamName,
+
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()),
+ Collections.singletonList(mergedRegion));
- // lookup all ancestors of a merged region and update the
endTime
- for (RegionInfo ri : regionsToMerge) {
- List<String> ancestorIDs = getAncestorIdsForMerge(conn,
tableName, streamName, ri);
- updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
- mergedRegion.getRegionId());
+ // lookup all ancestors of a merged region and update the
endTime
+ for (RegionInfo ri : regionsToMerge) {
+ List<String> ancestorIDs =
+ getAncestorIdsForMerge(conn, tableName,
streamName, ri);
+ updateParentPartitionEndTime(conn, tableName,
streamName, ancestorIDs,
+ mergedRegion.getRegionId());
+ }
+ return;
+ } else {
+ LOGGER.debug("{} does not have a stream enabled, skipping
partition metadata"
+ + " update.", mergedRegion.getTable());
+ return;
}
- } else {
- LOGGER.info("{} does not have a stream enabled, skipping
partition metadata update.",
- mergedRegion.getTable());
+ } catch (ParentPartitionNotFound e) {
+ LOGGER.debug("Parent partition not found, skipping remaining
steps.", e);
+ return;
+ } catch (TableNotFoundException e) {
+ LOGGER.warn("System Table not found during region merge
completion. "
+ + "This must happen before metadata upgrade only.", e);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Try: {}. Unable to update CDC Stream Partition
metadata during"
+ + " merge with parent regions: {} and daughter
region {}",
+ tries, regionsToMerge, mergedRegion.getEncodedName(),
e);
+ metricSource.incrementPostMergePartitionUpdateFailureCount();
+ caughtException = e;
}
- } catch (SQLException e) {
- LOGGER.error("Unable to update CDC Stream Partition metadata
during merge with " +
- "parent regions: {} and daughter region {}",
- regionsToMerge, mergedRegion.getEncodedName(), e);
- }
+ } while (tries++ < maxRetryCount);
+ // All retries exhausted
+ throw new IOException(
+ "Failed to update CDC Stream Partition metadata after " +
maxRetryCount +
+ " retries during merge with parent regions: " +
+ Arrays.toString(regionsToMerge) +
+ " and daughter region: " +
mergedRegion.getEncodedName(), caughtException);
}
/**
@@ -180,8 +252,9 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
* Return parent and all grandparent partition ids.
*
*/
- private List<String> getAncestorIdsForSplit(Connection conn, String
tableName, String streamName,
- RegionInfo regionInfoA, RegionInfo
regionInfoB)
+ protected List<String> getAncestorIdsForSplit(Connection conn, String
tableName,
+ String streamName,
+ RegionInfo regionInfoA,
RegionInfo regionInfoB)
throws SQLException {
byte[] parentStartKey = regionInfoA.getStartKey();
byte[] parentEndKey = regionInfoB.getEndKey();
@@ -212,8 +285,9 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
ancestorIDs.add(rs.getString(1));
ancestorIDs.add(rs.getString(2));
} else {
- throw new SQLException(String.format("Could not find parent of the
provided daughters: "
- + "startKeyA=%s endKeyA=%s startKeyB=%s
endKeyB=%s",
+ throw new ParentPartitionNotFound(
+ String.format("Could not find parent of the provided
daughters: "
+ + "startKeyA=%s endKeyA=%s startKeyB=%s
endKeyB=%s",
Bytes.toStringBinary(regionInfoA.getStartKey()),
Bytes.toStringBinary(regionInfoA.getEndKey()),
Bytes.toStringBinary(regionInfoB.getStartKey()),
@@ -230,8 +304,9 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
* Lookup the parent of a merged region.
* If the merged region was an output of a merge in the past, it will have
multiple parents.
*/
- private List<String> getAncestorIdsForMerge(Connection conn, String
tableName, String streamName,
- RegionInfo parent) throws
SQLException {
+ protected List<String> getAncestorIdsForMerge(Connection conn, String
tableName,
+ String streamName,
+ RegionInfo parent) throws
SQLException {
List<String> ancestorIDs = new ArrayList<>();
ancestorIDs.add(parent.getEncodedName());
PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
@@ -242,8 +317,9 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
if (rs.next()) {
ancestorIDs.add(rs.getString(1));
} else {
- throw new SQLException(String.format(
- "Could not find parent of the provided merged region: {}",
parent.getEncodedName()));
+ throw new ParentPartitionNotFound(String.format(
+ "Could not find parent of the provided merged region: %s",
+ parent.getEncodedName()));
}
// if parent was a result of a merge, there will be multiple
grandparents.
while (rs.next()) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index eede4b07b7..3fb604d8d0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TTLExpression;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.types.PBinaryBase;
import org.apache.phoenix.schema.types.PChar;
@@ -47,6 +46,11 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import java.io.IOException;
import java.sql.Connection;
@@ -109,6 +113,8 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
protected ManualEnvironmentEdge injectEdge;
protected Calendar cal = Calendar.getInstance();
+ protected static RegionCoprocessorEnvironment taskRegionEnvironment;
+
protected void createTable(Connection conn, String table_sql)
throws Exception {
createTable(conn, table_sql, null, false, null, false, null);
@@ -1001,4 +1007,93 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
gen.writeEndObject();
}
}
+
+ /**
+ * Gets the stream name for a CDC stream.
+ *
+ * @param conn The connection to use
+ * @param tableName The name of the table
+ * @param cdcName The name of the CDC stream
+ * @return The full stream name
+ * @throws SQLException if an error occurs
+ */
+ public String getStreamName(Connection conn, String tableName, String
cdcName)
+ throws SQLException {
+ return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
+ CDCUtil.getCDCCreationTimestamp(
+
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ }
+
+ /**
+ * Gets the status of a CDC stream.
+ *
+ * @param conn The connection to use.
+ * @param tableName The name of the table.
+ * @param streamName The name of the stream.
+ * @return The stream status.
+ * @throws Exception if an error occurs.
+ */
+ public String getStreamStatus(Connection conn, String tableName, String
streamName)
+ throws Exception {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT
STREAM_STATUS FROM "
+ + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" +
tableName +
+ "' AND STREAM_NAME='" + streamName + "'");
+ assertTrue(rs.next());
+ return rs.getString(1);
+ }
+
+ /**
+ * Creates a table and enables CDC on it. This method is shared between
CDCStreamIT and
+ * CDCStream2IT.
+ *
+ * @param conn The connection to use.
+ * @param tableName The name of the table to create.
+ * @param useTaskRegionObserver Whether to use
TaskRegionObserver.SelfHealingTask to enable
+ * the stream.
+ * @throws Exception if an error occurs.
+ */
+ public void createTableAndEnableCDC(Connection conn, String tableName,
+ boolean useTaskRegionObserver) throws
Exception {
+ String cdcName = generateUniqueName();
+ String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (k VARCHAR PRIMARY KEY," + "
v1 INTEGER,"
+ + " v2 VARCHAR)");
+ createCDC(conn, cdcSql, null);
+ String streamName = getStreamName(conn, tableName, cdcName);
+ if (useTaskRegionObserver) {
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ taskRegionEnvironment,
+
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+ } else {
+ while (!CDCUtil.CdcStreamStatus.ENABLED.toString()
+ .equals(getStreamStatus(conn, tableName, streamName))) {
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ /**
+ * Partition Metadata class.
+ */
+ public static class PartitionMetadata {
+ public String partitionId;
+ public String parentPartitionId;
+ public Long startTime;
+ public Long endTime;
+ public byte[] startKey;
+ public byte[] endKey;
+
+ public PartitionMetadata(ResultSet rs) throws Exception {
+ partitionId = rs.getString(3);
+ parentPartitionId = rs.getString(4);
+ startTime = rs.getLong(5);
+ endTime = rs.getLong(6);
+ startKey = rs.getBytes(7);
+ endKey = rs.getBytes(8);
+ }
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java
new file mode 100644
index 0000000000..50848a0091
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStream2IT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class CDCStream2IT extends CDCBaseIT {
+
+ private static final MetricsPhoenixMasterSource METRICS_SOURCE =
+
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = new HashMap<>();
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(60 * 60)); // An hour
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
+ props.put("hbase.coprocessor.master.classes",
TestPhoenixMasterObserver.class.getName());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testPartitionMetadataWithRetries() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName, false);
+
+ assertEquals("Post split partition update failures should be 0
initially",
+ 0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+ assertEquals("Post merge partition update failures should be 0
initially",
+ 0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+ // Perform a split operation - this will fail 24 times before
succeeding
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
+
+ // Verify split metric is 24
+ assertEquals("Post split partition update failures should be 24 after
retries",
+ 24, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+
+ List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn,
tableName);
+
+ // Perform a merge operation - this will fail 15 times before
succeeding
+ TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+
+ // Verify merge metric is 15
+ assertEquals("Post merge partition update failures should be 15 after
retries",
+ 15, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+ List<PartitionMetadata> splitParents = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.startKey == null && pm.endKey == null && pm.endTime == 0) {
+ mergedDaughter.add(pm);
+ }
+ if (pm.startKey != null || pm.endKey != null) {
+ splitParents.add(pm);
+ }
+ }
+ assertEquals(2, mergedDaughter.size());
+ assertEquals(2, splitParents.size());
+ assertEquals(mergedDaughter.get(0).startTime,
mergedDaughter.get(1).startTime);
+ assertEquals(mergedDaughter.get(0).endTime,
mergedDaughter.get(1).endTime);
+ assertEquals(mergedDaughter.get(0).partitionId,
mergedDaughter.get(1).partitionId);
+ assertTrue(mergedDaughter.stream().anyMatch(
+ d -> Objects.equals(d.parentPartitionId,
splitParents.get(0).partitionId)));
+ assertTrue(mergedDaughter.stream().anyMatch(
+ d -> Objects.equals(d.parentPartitionId,
splitParents.get(1).partitionId)));
+ for (PartitionMetadata splitDaughter : splitParents) {
+ Assert.assertEquals(mergedDaughter.get(0).startTime,
splitDaughter.endTime);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 6b97d118d6..72f6b50b4b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -21,7 +21,6 @@ package org.apache.phoenix.end2end;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
@@ -39,6 +38,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
+import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -57,15 +58,16 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
-import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
public class CDCStreamIT extends CDCBaseIT {
- private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final MetricsPhoenixMasterSource METRICS_SOURCE =
+
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -78,7 +80,7 @@ public class CDCStreamIT extends CDCBaseIT {
Long.toString(Long.MAX_VALUE));
props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
- TaskRegionEnvironment =
+ taskRegionEnvironment =
getUtility()
.getRSForFirstRegionInTable(
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
@@ -105,7 +107,7 @@ public class CDCStreamIT extends CDCBaseIT {
// run task to populate partitions and enable stream
TaskRegionObserver.SelfHealingTask task =
new TaskRegionObserver.SelfHealingTask(
- TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ taskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
task.run();
// stream should be in ENABLED state and metadata is populated for
every table region
@@ -140,7 +142,7 @@ public class CDCStreamIT extends CDCBaseIT {
// run task to populate partitions and enable stream
TaskRegionObserver.SelfHealingTask task =
new TaskRegionObserver.SelfHealingTask(
- TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ taskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
task.run();
// stream exists in ENABLED status
@@ -173,7 +175,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region somewhere in the middle
TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
@@ -210,7 +212,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region [null, null]
TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -254,7 +256,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region [null, null]
TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -298,7 +300,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region [null, null]
TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
@@ -336,7 +338,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region
TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
@@ -386,7 +388,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
//split the only region
TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -433,7 +435,7 @@ public class CDCStreamIT extends CDCBaseIT {
// create table, cdc and bootstrap stream metadata
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
// split the only region
TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
@@ -488,7 +490,7 @@ public class CDCStreamIT extends CDCBaseIT {
public void testGetRecords() throws Exception {
Connection conn = newConnection();
String tableName = generateUniqueName();
- createTableAndEnableCDC(conn, tableName);
+ createTableAndEnableCDC(conn, tableName, true);
// upsert data
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a', 1, 'foo')");
@@ -530,9 +532,33 @@ public class CDCStreamIT extends CDCBaseIT {
}
}
- private String getStreamName(Connection conn, String tableName, String
cdcName) throws SQLException {
- return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
CDCUtil.getCDCCreationTimestamp(
-
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ @Test
+ public void testPartitionUpdateFailureMetrics() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName, true);
+
+ assertEquals("Post split partition update failures should be 0
initially",
+ 0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+ assertEquals("Post merge partition update failures should be 0
initially",
+ 0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
+
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
+
+ // Verify split metric is still 0 (successful split)
+ assertEquals("Post split partition update failures should be 0 after
successful split",
+ 0, METRICS_SOURCE.getPostSplitPartitionUpdateFailureCount());
+
+ List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn,
tableName);
+
+ TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+
+ // Verify merge metric is still 0 (successful merge)
+ assertEquals("Post merge partition update failures should be 0 after
successful merge",
+ 0, METRICS_SOURCE.getPostMergePartitionUpdateFailureCount());
}
private void assertStreamStatus(Connection conn, String tableName, String
streamName,
@@ -546,8 +572,7 @@ public class CDCStreamIT extends CDCBaseIT {
private void assertPartitionMetadata(Connection conn, String tableName,
String cdcName)
throws SQLException {
- String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName,
cdcName,
-
CDCUtil.getCDCCreationTimestamp(conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ String streamName = getStreamName(conn, tableName, cdcName);
List<HRegionLocation> tableRegions
=
conn.unwrap(PhoenixConnection.class).getQueryServices().getAllTableRegions(tableName.getBytes());
for (HRegionLocation tableRegion : tableRegions) {
@@ -562,39 +587,4 @@ public class CDCStreamIT extends CDCBaseIT {
}
}
- private void createTableAndEnableCDC(Connection conn, String tableName)
throws Exception {
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k VARCHAR PRIMARY KEY," + "
v1 INTEGER,"
- + " v2 VARCHAR)");
- createCDC(conn, cdc_sql, null);
- String streamName = getStreamName(conn, tableName, cdcName);
- TaskRegionObserver.SelfHealingTask task =
- new TaskRegionObserver.SelfHealingTask(
- TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
- task.run();
- assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLED);
- }
-
- /**
- * Inner class to represent partition metadata for a region i.e. single
row from SYSTEM.CDC_STREAM
- */
- private class PartitionMetadata {
- public String partitionId;
- public String parentPartitionId;
- public Long startTime;
- public Long endTime;
- public byte[] startKey;
- public byte[] endKey;
-
- public PartitionMetadata(ResultSet rs) throws SQLException {
- partitionId = rs.getString(3);
- parentPartitionId = rs.getString(4);
- startTime = rs.getLong(5);
- endTime = rs.getLong(6);
- startKey = rs.getBytes(7);
- endKey = rs.getBytes(8);
- }
- }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
new file mode 100644
index 0000000000..91f5c08e0e
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TestPhoenixMasterObserver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Test implementation of PhoenixMasterObserver that simulates failures for a
specific number
+ * of retries before succeeding.
+ */
+public class TestPhoenixMasterObserver extends PhoenixMasterObserver {
+
+ private int splitFailureCount;
+ private int mergeFailureCount;
+
+ private static final int SPLIT_FAILURE_THRESHOLD = 24;
+ private static final int MERGE_FAILURE_THRESHOLD = 15;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ super.start(env);
+ splitFailureCount = 0;
+ mergeFailureCount = 0;
+ }
+
+ @Override
+ protected List<String> getAncestorIdsForSplit(Connection conn, String
tableName,
+ String streamName,
RegionInfo regionInfoA,
+ RegionInfo regionInfoB)
throws SQLException {
+ if (splitFailureCount < SPLIT_FAILURE_THRESHOLD) {
+ splitFailureCount++;
+ throw new SQLException(
+ "Test failure for split operation, attempt " +
splitFailureCount);
+ }
+ return super.getAncestorIdsForSplit(conn, tableName, streamName,
regionInfoA, regionInfoB);
+ }
+
+ @Override
+ protected List<String> getAncestorIdsForMerge(Connection conn, String
tableName,
+ String streamName,
RegionInfo parent)
+ throws SQLException {
+ if (mergeFailureCount < MERGE_FAILURE_THRESHOLD) {
+ mergeFailureCount++;
+ throw new SQLException(
+ "Test failure for merge operation, attempt " +
mergeFailureCount);
+ }
+ return super.getAncestorIdsForMerge(conn, tableName, streamName,
parent);
+ }
+
+}
\ No newline at end of file