Copilot commented on code in PR #2197:
URL: https://github.com/apache/phoenix/pull/2197#discussion_r2176091099
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java:
##########
@@ -35,11 +37,13 @@ public class MetricsReplicationLogSourceImpl extends
BaseSourceImpl
private final MutableHistogram rotationTime;
private final MutableHistogram ringBufferTime;
- public MetricsReplicationLogSourceImpl() {
- this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
+ public MetricsReplicationLogGroupSourceImpl(String haGroupId) {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
+ METRICS_JMX_CONTEXT + ",haGroup=" + haGroupId);
+ this.groupMetricsContext = METRICS_JMX_CONTEXT + ",haGroup=" +
haGroupId;
}
- public MetricsReplicationLogSourceImpl(String metricsName, String
metricsDescription,
+ public MetricsReplicationLogGroupSourceImpl(String metricsName, String
metricsDescription,
String metricsContext, String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext,
metricsJmxContext);
Review Comment:
The multi-argument constructor doesn’t set groupMetricsContext, so
getMetricsJmxContext() will return null. Assign `groupMetricsContext =
metricsJmxContext;` right after calling `super(...)`.
```suggestion
super(metricsName, metricsDescription, metricsContext,
metricsJmxContext);
this.groupMetricsContext = metricsJmxContext;
```
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSource;
+import
org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSourceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReplicationLogGroup manages a group of replication logs for a given HA
Group.
+ * <p>
+ * This class provides an API for replication operations and delegates to
either synchronous
+ * replication (StandbyLogGroupWriter) or store-and-forward replication
+ * (StoreAndForwardLogGroupWriter) based on the current replication mode.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li>Manages multiple replication logs for an HA Group</li>
+ * <li>Provides append() and sync() API for higher layers</li>
+ * <li>Delegates to appropriate writer implementation based on replication
mode</li>
+ * <li>Thread-safe operations</li>
+ * </ul>
+ * <p>
+ * The class delegates actual replication work to implementations of
ReplicationLogGroupWriter:
+ * <ul>
+ * <li>StandbyLogGroupWriter: Synchronous replication to standby cluster</li>
+ * <li>StoreAndForwardLogGroupWriter: Local storage with forwarding when
available</li>
+ * </ul>
+ */
+public class ReplicationLogGroup {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroup.class);
+
+ // Configuration constants from original ReplicationLog
+ public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+ "phoenix.replication.log.standby.hdfs.url";
+ public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+ "phoenix.replication.log.fallback.hdfs.url";
+ public static final String REPLICATION_NUM_SHARDS_KEY =
"phoenix.replication.log.shards";
+ public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+ public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+ public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+ "phoenix.replication.log.rotation.time.ms";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 *
1000L;
+ public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+ "phoenix.replication.log.rotation.size.bytes";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256
* 1024 * 1024L;
+ public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+ "phoenix.replication.log.rotation.size.percentage";
+ public static final double
DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95;
+ public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+ "phoenix.replication.log.compression";
+ public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM =
"NONE";
+ public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+ "phoenix.replication.log.ringbuffer.size";
+ public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 *
32;
+ public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+ "phoenix.replication.log.sync.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+ public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+ "phoenix.replication.log.sync.retries";
+ public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+ public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+ "phoenix.replication.log.rotation.retries";
+ public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+ public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+ "phoenix.replication.log.retry.delay.ms";
+ public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+ public static final String SHARD_DIR_FORMAT = "%05d";
Review Comment:
The shard directory format lacks the "shard" prefix. Change the format to
`"shard%05d"` so directories are named like `shard00001`.
```suggestion
public static final String SHARD_DIR_FORMAT = "shard%05d";
```
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store-and-forward replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class is a stub implementation for future store-and-forward
replication functionality.
+ * Store-and-forward mode is used when the standby cluster is temporarily
unavailable - mutations
+ * are stored locally and forwarded when connectivity is restored.
+ * <p>
+ * Currently this is a stub that throws UnsupportedOperationException for the
abstract methods.
+ * Future implementation will include:
+ * <ul>
+ * <li>Local storage of mutations when standby is unavailable</li>
+ * <li>Background forwarding when connectivity is restored</li>
+ * <li>Proper error handling and retry logic</li>
+ * <li>Integration with HA state management</li>
+ * <li>Dual-mode operation: local storage + forwarding</li>
+ * </ul>
+ */
+public class StoreAndForwardLogGroupWriter extends ReplicationLogGroupWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StoreAndForwardLogGroupWriter.class);
+
+ /**
+ * Constructor for StoreAndForwardLogGroupWriter.
+ */
+ public StoreAndForwardLogGroupWriter(ReplicationLogGroup logGroup) {
+ super(logGroup);
+ LOG.debug("Created StoreAndForwardLogGroupWriter for HA Group: {}",
logGroup.getHaGroupName());
+ }
+
+ @Override
+ public void init() throws IOException {
+ // TODO
Review Comment:
[nitpick] This stub `init()` method currently does nothing. Consider
throwing `UnsupportedOperationException` or providing at least a placeholder
implementation to avoid silent no-ops and null behaviors.
```suggestion
throw new UnsupportedOperationException("The init() method is not
yet implemented.");
```
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class implements synchronous replication to a standby cluster's HDFS.
It writes replication
+ * logs directly to the standby cluster in synchronous mode, providing
immediate consistency for
+ * failover scenarios.
+ */
+public class StandbyLogGroupWriter extends ReplicationLogGroupWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StandbyLogGroupWriter.class);
+
+ private FileSystem standbyFs;
+ private URI standbyUrl;
+ protected int numShards;
+ protected final ConcurrentHashMap<Path, Object> shardMap = new
ConcurrentHashMap<>();
+
+ /**
+ * Constructor for StandbyLogGroupWriter.
+ */
+ public StandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+ super(logGroup);
+ Configuration conf = logGroup.getConfiguration();
+ this.numShards =
conf.getInt(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_NUM_SHARDS);
+ LOG.debug("Created StandbyLogGroupWriter for HA Group: {}",
logGroup.getHaGroupName());
+ }
+
+ @Override
+ protected void initializeFileSystems() throws IOException {
+ if (numShards > ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS) {
+ throw new
IllegalArgumentException(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY
+ + " is " + numShards + ", but the limit is "
+ + ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS);
+ }
+ Configuration conf = logGroup.getConfiguration();
+ String standbyUrlString =
conf.get(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+ if (standbyUrlString == null || standbyUrlString.trim().isEmpty()) {
+ throw new IOException("Standby HDFS URL not configured: "
+ + ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+ }
+ try {
+ standbyUrl = new URI(standbyUrlString);
+ standbyFs = getFileSystem(standbyUrl);
+ LOG.info("Initialized standby filesystem: {}", standbyUrl);
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid standby HDFS URL: " +
standbyUrlString, e);
+ }
+ }
+
+ /**
+ * Creates a new log file path in a sharded directory structure based on
server name and
+ * timestamp. The resulting path structure is
+ * <pre>
+ * [url]/[haGroupId]/[shard]/[timestamp]-[servername].plog
+ * </pre>
+ */
+ protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+ Path haGroupPath = new Path(url.getPath(), logGroup.getHaGroupName());
+ long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+ // To have all logs for a given regionserver appear in the same shard,
hash only the
+ // serverName. However we expect some regionservers will have
significantly more load than
+ // others so we instead distribute the logs over all of the shards
randomly for a more even
+ // overall distribution by also hashing the timestamp.
+ int shard = Math.floorMod(logGroup.getServerName().hashCode() ^
Long.hashCode(timestamp),
+ numShards);
+ Path shardPath = new Path(haGroupPath,
+ String.format(ReplicationLogGroup.SHARD_DIR_FORMAT, shard));
+ // Ensure the shard directory exists. We track which shard directories
we have probed or
+ // created to avoid a round trip to the namenode for repeats.
+ IOException[] exception = new IOException[1];
+ shardMap.computeIfAbsent(shardPath, p -> {
+ try {
+ if (!fs.exists(p)) {
+ fs.mkdirs(haGroupPath); // This probably exists, but just
in case.
+ if (!fs.mkdirs(shardPath)) {
+ throw new IOException("Could not create path: " + p);
+ }
+ }
+ } catch (IOException e) {
+ exception[0] = e;
+ return null; // Don't cache the path if we can't create it.
+ }
+ return p;
+ });
+ // If we faced an exception in computeIfAbsent, throw it
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ Path filePath = new Path(shardPath,
String.format(ReplicationLogGroup.FILE_NAME_FORMAT,
+ timestamp, logGroup.getServerName()));
+ return filePath;
+ }
+
+ /** Creates and initializes a new LogFileWriter. */
+ protected LogFileWriter createNewWriter() throws IOException {
Review Comment:
This method doesn’t set the writer generation on the new LogFileWriter. To
preserve generation tracking, call
`newWriter.setGeneration(writerGeneration.incrementAndGet());` after
initializing it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]