This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 20c2c2431f PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
20c2c2431f is described below

commit 20c2c2431f6278db6d31c9290f7765c56094e8fe
Author: Andrew Purtell <[email protected]>
AuthorDate: Tue Jul 1 08:39:49 2025 -0700

    PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
---
 .../phoenix/replication/ReplicationLogGroup.java   | 419 ++++++++++++++
 ...tionLog.java => ReplicationLogGroupWriter.java} | 549 +++++-------------
 .../phoenix/replication/StandbyLogGroupWriter.java | 138 +++++
 .../replication/StoreAndForwardLogGroupWriter.java |  76 +++
 ....java => MetricsReplicationLogGroupSource.java} |  14 +-
 ...a => MetricsReplicationLogGroupSourceImpl.java} |  27 +-
 ...onLogTest.java => ReplicationLogGroupTest.java} | 614 +++++++++------------
 7 files changed, 1044 insertions(+), 793 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
new file mode 100644
index 0000000000..0c3eeacfd9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSource;
+import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSourceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReplicationLogGroup manages a group of replication logs for a given HA 
Group.
+ * <p>
+ * This class provides an API for replication operations and delegates to 
either synchronous
+ * replication (StandbyLogGroupWriter) or store-and-forward replication
+ * (StoreAndForwardLogGroupWriter) based on the current replication mode.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li>Manages multiple replication logs for an HA Group</li>
+ *   <li>Provides append() and sync() API for higher layers</li>
+ *   <li>Delegates to appropriate writer implementation based on replication 
mode</li>
+ *   <li>Thread-safe operations</li>
+ * </ul>
+ * <p>
+ * The class delegates actual replication work to implementations of 
ReplicationLogGroupWriter:
+ * <ul>
+ *   <li>StandbyLogGroupWriter: Synchronous replication to standby cluster</li>
+ *   <li>StoreAndForwardLogGroupWriter: Local storage with forwarding when 
available</li>
+ * </ul>
+ */
+public class ReplicationLogGroup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroup.class);
+
+    // Configuration constants from original ReplicationLog
+    public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+        "phoenix.replication.log.standby.hdfs.url";
+    public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+        "phoenix.replication.log.fallback.hdfs.url";
+    public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
+    public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+    public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+    public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+        "phoenix.replication.log.rotation.time.ms";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
+    public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+        "phoenix.replication.log.rotation.size.bytes";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 
* 1024 * 1024L;
+    public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+        "phoenix.replication.log.rotation.size.percentage";
+    public static final double 
DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95;
+    public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+        "phoenix.replication.log.compression";
+    public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
+    public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+        "phoenix.replication.log.ringbuffer.size";
+    public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 
32;
+    public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+        "phoenix.replication.log.sync.timeout.ms";
+    public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+    public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+        "phoenix.replication.log.sync.retries";
+    public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+    public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+        "phoenix.replication.log.rotation.retries";
+    public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+    public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+        "phoenix.replication.log.retry.delay.ms";
+    public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+    public static final String SHARD_DIR_FORMAT = "%05d";
+    public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+    /** Cache of ReplicationLogGroup instances by HA Group ID */
+    protected static final ConcurrentHashMap<String, ReplicationLogGroup> 
INSTANCES =
+        new ConcurrentHashMap<>();
+
+    protected final Configuration conf;
+    protected final ServerName serverName;
+    protected final String haGroupName;
+    protected ReplicationLogGroupWriter remoteWriter;
+    protected ReplicationLogGroupWriter localWriter;
+    protected ReplicationMode mode;
+    protected volatile boolean closed = false;
+    protected final MetricsReplicationLogGroupSource metrics;
+
+    /**
+     * Tracks the current replication mode of the ReplicationLog.
+     * <p>
+     * The replication mode determines how mutations are handled:
+     * <ul>
+     *   <li>SYNC: Normal operation where mutations are written directly to 
the standby cluster's
+     *   HDFS.
+     *   This is the default and primary mode of operation.</li>
+     *   <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS 
is unavailable.
+     *   Mutations are stored locally and will be forwarded when connectivity 
is restored.</li>
+     *   <li>SYNC_AND_FORWARD: Transitional mode where new mutations are 
written directly to the
+     *   standby cluster while concurrently draining the local queue of 
previously stored
+     *   mutations.</li>
+     * </ul>
+     * <p>
+     * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
+     * and the state of the local mutation queue.
+     */
+    protected enum ReplicationMode {
+        /**
+         * Normal operation where mutations are written directly to the 
standby cluster's HDFS.
+         * This is the default and primary mode of operation.
+         */
+        SYNC,
+
+        /**
+         * Fallback mode when the standby cluster's HDFS is unavailable. 
Mutations are stored
+         * locally and will be forwarded when connectivity is restored.
+         */
+        STORE_AND_FORWARD,
+
+        /**
+         * Transitional mode where new mutations are written directly to the 
standby cluster
+         * while concurrently draining the local queue of previously stored 
mutations. This mode
+         * is entered when connectivity to the standby cluster is restored and 
there are still
+         * mutations in the local queue.
+         */
+        SYNC_AND_FORWARD;
+    }
+
+    /**
+     * Get or create a ReplicationLogGroup instance for the given HA Group.
+     *
+     * @param conf Configuration object
+     * @param serverName The server name
+     * @param haGroupName The HA Group name
+     * @return ReplicationLogGroup instance
+     * @throws RuntimeException if initialization fails
+     */
+    public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
+            String haGroupName) {
+        return INSTANCES.computeIfAbsent(haGroupName, k -> {
+            try {
+                ReplicationLogGroup group = new ReplicationLogGroup(conf, 
serverName, haGroupName);
+                group.init();
+                return group;
+            } catch (IOException e) {
+                LOG.error("Failed to create ReplicationLogGroup for HA Group: 
{}", haGroupName, e);
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /**
+     * Protected constructor for ReplicationLogGroup.
+     *
+     * @param conf Configuration object
+     * @param serverName The server name
+     * @param haGroupName The HA Group name
+     */
+    protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupName) {
+        this.conf = conf;
+        this.serverName = serverName;
+        this.haGroupName = haGroupName;
+        this.metrics = createMetricsSource();
+    }
+
+    /**
+     * Initialize the ReplicationLogGroup by creating the appropriate writer 
implementation.
+     *
+     * @throws IOException if initialization fails
+     */
+    protected void init() throws IOException {
+        // We need the local writer created first if we intend to fall back to 
it should the init
+        // of the remote writer fail.
+        localWriter = createLocalWriter();
+        // Initialize the remote writer and set the mode to SYNC. TODO: switch 
instead of set
+        mode = ReplicationMode.SYNC;
+        remoteWriter = createRemoteWriter();
+        // TODO: Switch the initial mode to STORE_AND_FORWARD if the remote 
writer fails to
+        // initialize.
+        LOG.info("Started ReplicationLogGroup for HA Group: {}", haGroupName);
+    }
+
+    /**
+     * Get the name for this HA Group.
+     *
+     * @return The name for this HA Group
+     */
+    public String getHaGroupName() {
+        return haGroupName;
+    }
+
+    protected Configuration getConfiguration() {
+        return conf;
+    }
+
+    protected ServerName getServerName() {
+        return serverName;
+    }
+
+    /**
+     * Append a mutation to the replication log group. This operation is 
normally non-blocking
+     * unless the ring buffer is full.
+     *
+     * @param tableName The name of the HBase table the mutation applies to
+     * @param commitId The commit identifier (e.g., SCN) associated with the 
mutation
+     * @param mutation The HBase Mutation (Put or Delete) to be logged
+     * @throws IOException If the operation fails
+     */
+    public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+        if (closed) {
+            throw new IOException("Closed");
+        }
+        long startTime = System.nanoTime();
+        try {
+            switch (mode) {
+            case SYNC:
+                // In sync mode, we only write to the remote writer.
+                try {
+                    remoteWriter.append(tableName, commitId, mutation);
+                } catch (IOException e) {
+                    // TODO: If the remote writer fails, we must switch to 
store and forward.
+                    LOG.warn("Mode switching not implemented");
+                    throw e;
+                }
+                break;
+            case SYNC_AND_FORWARD:
+                // In sync and forward mode, we write to only the remote 
writer, while in the
+                // background we are draining the local queue.
+                try {
+                    remoteWriter.append(tableName, commitId, mutation);
+                } catch (IOException e) {
+                    // TODO: If the remote writer fails again, we must switch 
back to store and
+                    // forward.
+                    LOG.warn("Mode switching not implemented");
+                    throw e;
+                }
+                break;
+            case STORE_AND_FORWARD:
+                // In store and forward mode, we append to the local writer. 
If we fail it's a
+                // critical failure.
+                localWriter.append(tableName, commitId, mutation);
+                // TODO: Probe the state of the remoteWriter. Can we switch 
back?
+                // TODO: This suggests the ReplicationLogGroupWriter interface 
should have a status
+                // probe API.
+                break;
+            default:
+                throw new IllegalStateException("Invalid replication mode: " + 
mode);
+            }
+        } finally {
+            metrics.updateAppendTime(System.nanoTime() - startTime);
+        }
+    }
+
+    /**
+     * Ensure all previously appended records are durably persisted. This 
method blocks until the
+     * sync operation completes or fails.
+     *
+     * @throws IOException If the sync operation fails
+     */
+    public void sync() throws IOException {
+        if (closed) {
+            throw new IOException("Closed");
+        }
+        long startTime = System.nanoTime();
+        try {
+            switch (mode) {
+            case SYNC:
+                // In sync mode, we only write to the remote writer.
+                try {
+                    remoteWriter.sync();
+                } catch (IOException e) {
+                    // TODO: If the remote writer fails, we must switch to 
store and forward.
+                    LOG.warn("Mode switching not implemented");
+                    throw e;
+                }
+                break;
+            case SYNC_AND_FORWARD:
+                // In sync and forward mode, we write to only the remote 
writer, while in the
+                // background we are draining the local queue.
+                try {
+                    remoteWriter.sync();
+                } catch (IOException e) {
+                    // TODO: If the remote writer fails again, we must switch 
back to store and
+                    // forward.
+                    LOG.warn("Mode switching not implemented");
+                    throw e;
+                }
+                break;
+            case STORE_AND_FORWARD:
+                // In store and forward mode, we sync the local writer. If we 
fail it's a critical
+                // failure.
+                localWriter.sync();
+                // TODO: Probe the state of the remoteWriter. Can we switch 
back?
+                // TODO: This suggests the ReplicationLogGroupWriter interface 
should have a
+                // status probe API.
+                break;
+            default:
+                throw new IllegalStateException("Invalid replication mode: " + 
mode);
+            }
+        } finally {
+            metrics.updateSyncTime(System.nanoTime() - startTime);
+        }
+    }
+
+    /**
+     * Check if this ReplicationLogGroup is closed.
+     *
+     * @return true if closed, false otherwise
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * Close the ReplicationLogGroup and all associated resources. This method 
is thread-safe and
+     * can be called multiple times.
+     */
+    public void close() {
+        if (closed) {
+            return;
+        }
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            // Remove from instances cache
+            INSTANCES.remove(haGroupName);
+            // Close the writers, remote first. If there are any problems 
closing the remote writer
+            // the pending writes will be sent to the local writer instead, 
during the appropriate
+            // mode switch.
+            closeWriter(remoteWriter);
+            closeWriter(localWriter);
+            metrics.close();
+            LOG.info("Closed ReplicationLogGroup for HA Group: {}", 
haGroupName);
+        }
+    }
+
+    /**
+     * Switch the replication mode.
+     *
+     * @param mode The new replication mode
+     * @param reason The reason for the mode switch
+     * @throws IOException If the mode switch fails
+     */
+    public void switchMode(ReplicationMode mode, Throwable reason) throws 
IOException {
+        // TODO: Implement mode switching guardrails and transition logic.
+        // TODO: We will be interacting with the HA Group Store to switch 
modes.
+
+        // TODO: Drain the disruptor ring from the remote writer to the local 
writer when making
+        // transitions from SYNC or SYNC_AND_FORWARD to STORE_AND_FORWARD.
+
+        throw new UnsupportedOperationException("Mode switching is not 
implemented");
+    }
+
+    /** Get the current metrics source for monitoring operations. */
+    public MetricsReplicationLogGroupSource getMetrics() {
+        return metrics;
+    }
+
+    /** Create a new metrics source for monitoring operations. */
+    protected MetricsReplicationLogGroupSource createMetricsSource() {
+        return new MetricsReplicationLogGroupSourceImpl(haGroupName);
+    }
+
+    /** Close the given writer. */
+    protected void closeWriter(ReplicationLogGroupWriter writer) {
+        if (writer != null) {
+            writer.close();
+        }
+    }
+
+    /** Create the remote (synchronous) writer. Mainly for tests. */
+    protected ReplicationLogGroupWriter createRemoteWriter() throws 
IOException {
+        ReplicationLogGroupWriter writer = new StandbyLogGroupWriter(this);
+        writer.init();
+        return writer;
+    }
+
+    /** Create the local (store and forward) writer. Mainly for tests. */
+    protected ReplicationLogGroupWriter createLocalWriter() throws IOException 
{
+        ReplicationLogGroupWriter writer = new 
StoreAndForwardLogGroupWriter(this);
+        writer.init();
+        return writer;
+    }
+
+    /** Returns the currently active writer. Mainly for tests. */
+    protected ReplicationLogGroupWriter getActiveWriter() {
+        switch (mode) {
+        case SYNC:
+            return remoteWriter;
+        case SYNC_AND_FORWARD:
+            return remoteWriter;
+        case STORE_AND_FORWARD:
+            return localWriter;
+        default:
+            throw new IllegalStateException("Invalid replication mode: " + 
mode);
+        }
+    }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
similarity index 60%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index 15c237e519..6262e75a10 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -20,11 +20,9 @@ package org.apache.phoenix.replication;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -35,15 +33,9 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.log.LogFileWriterContext;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
@@ -58,42 +50,17 @@ import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
 /**
- * The ReplicationLog implements a high-performance logging system for 
mutations that need to be
- * replicated to another cluster.
+ * Base class for replication log group writers.
  * <p>
- * Key features:
- * <ul>
- *   <li>Asynchronous append operations with batching for high throughput</li>
- *   <li>Controlled blocking sync operations with timeout and retry logic</li>
- *   <li>Automatic log rotation based on time and size thresholds</li>
- *   <li>Sharded directory structure for better HDFS performance</li>
- *   <li>Metrics for monitoring</li>
- *   <li>Fail-stop behavior on critical errors</li>
- * </ul>
- * <p>
- * The class supports three replication modes (though currently only SYNC is 
implemented):
- * <ul>
- *   <li>SYNC: Direct writes to standby cluster (default)</li>
- *   <li>STORE_AND_FORWARD: Local storage when standby is unavailable</li>
- *   <li>SYNC_AND_FORWARD: Concurrent direct writes and queue draining</li>
- * </ul>
- * <p>
- * Key configuration properties:
- * <ul>
- *   <li>{@link #REPLICATION_STANDBY_HDFS_URL_KEY}: URL for standby cluster 
HDFS</li>
- *   <li>{@link #REPLICATION_FALLBACK_HDFS_URL_KEY}: URL for local fallback 
storage</li>
- *   <li>{@link #REPLICATION_NUM_SHARDS_KEY}: Number of shard directories</li>
- *   <li>{@link #REPLICATION_LOG_ROTATION_TIME_MS_KEY}: Time-based rotation 
interval</li>
- *   <li>{@link #REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY}: Size-based rotation 
threshold</li>
- *   <li>{@link #REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY}: Compression 
algorithm</li>
- * </ul>
- * <p>
- * This class is intended to be thread-safe.
+ * This abstract class contains most of the common functionality for managing 
replication logs
+ * including the disruptor ring buffer, log rotation, file system management, 
and metrics.
+ * Concrete implementations provide specific replication behavior (synchronous 
vs store-and-
+ * forward).
  * <p>
  * Architecture Overview:
  * <pre>
  * ┌──────────────────────────────────────────────────────────────────────┐
- * │                           ReplicationLog                             │
+ * │                       ReplicationLogGroup                            │
  * │                                                                      │
  * │  ┌─────────────┐     ┌────────────────────────────────────────────┐  │
  * │  │             │     │                                            │  │
@@ -138,132 +105,26 @@ import com.lmax.disruptor.dsl.ProducerType;
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = { "EI_EXPOSE_REP", 
"EI_EXPOSE_REP2",
     "MS_EXPOSE_REP" }, justification = "Intentional")
-public class ReplicationLog {
-
-    /** The path on the standby HDFS where log files should be written. (The 
"IN" directory.) */
-    public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
-        "phoenix.replication.log.standby.hdfs.url";
-    /**
-     * The path on the active HDFS where log files should be written when we 
have fallen back to
-     * store and forward mode. (The "OUT" directory.)
-     */
-    public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
-        "phoenix.replication.log.fallback.hdfs.url";
-    /**
-     * The number of shards (subfolders) to maintain in the "IN" directory.
-     * <p>
-     * Shard directories have the format shard-NNNNN, e.g. shard-00001. The 
maximum value is
-     * 100000.
-     */
-    public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
-    public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
-    public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
-    /** Replication log rotation time trigger, default is 1 minute */
-    public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
-        "phoenix.replication.log.rotation.time.ms";
-    public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
-    /** Replication log rotation size trigger, default is 256 MB */
-    public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
-        "phoenix.replication.log.rotation.size.bytes";
-    public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 
* 1024 * 1024L;
-    /** Replication log rotation size trigger percentage, default is 0.95 */
-    public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
-        "phoenix.replication.log.rotation.size.percentage";
-    public static final double 
DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95;
-    /** Replication log compression, default is "NONE" */
-    public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
-        "phoenix.replication.log.compression";
-    public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
-    public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
-        "phoenix.replication.log.ringbuffer.size";
-    public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 
32;  // Too big?
-    public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
-        "phoenix.replication.log.sync.timeout.ms";
-    public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
-    public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
-        "phoenix.replication.log.sync.retries";
-    public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
-    public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
-        "phoenix.replication.log.rotation.retries";
-    public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
-    public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
-        "phoenix.replication.log.retry.delay.ms";
-    public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
-
-    public static final String SHARD_DIR_FORMAT = "shard%05d";
-    public static final String FILE_NAME_FORMAT = "%d-%s.plog";
-
-    static final byte EVENT_TYPE_DATA = 0;
-    static final byte EVENT_TYPE_SYNC = 1;
-
-    static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+public abstract class ReplicationLogGroupWriter {
 
-    protected static volatile ReplicationLog instance;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupWriter.class);
 
-    protected final Configuration conf;
-    protected final ServerName serverName;
-    protected FileSystem standbyFs;
-    protected FileSystem fallbackFs; // For store-and-forward (future use)
-    protected int numShards;
-    protected URI standbyUrl;
-    protected URI fallbackUrl; // For store-and-forward (future use)
+    protected final ReplicationLogGroup logGroup;
     protected final long rotationTimeMs;
     protected final long rotationSizeBytes;
     protected final int maxRotationRetries;
     protected final Compression.Algorithm compression;
+    protected final int ringBufferSize;
+    protected final long syncTimeoutMs;
     protected final ReentrantLock lock = new ReentrantLock();
-    protected volatile LogFileWriter currentWriter; // Current writer
+    protected volatile LogFileWriter currentWriter;
     protected final AtomicLong lastRotationTime = new AtomicLong();
     protected final AtomicLong writerGeneration = new AtomicLong();
     protected final AtomicLong rotationFailures = new AtomicLong(0);
     protected ScheduledExecutorService rotationExecutor;
-    protected final int ringBufferSize;
-    protected final long syncTimeoutMs;
     protected Disruptor<LogEvent> disruptor;
     protected RingBuffer<LogEvent> ringBuffer;
-    protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
-    protected final MetricsReplicationLogSource metrics;
-    protected volatile boolean isClosed = false;
-
-    /**
-     * Tracks the current replication mode of the ReplicationLog.
-     * <p>
-     * The replication mode determines how mutations are handled:
-     * <ul>
-     *   <li>SYNC: Normal operation where mutations are written directly to 
the standby cluster's
-     *   HDFS.
-     *   This is the default and primary mode of operation.</li>
-     *   <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS 
is unavailable.
-     *   Mutations are stored locally and will be forwarded when connectivity 
is restored.</li>
-     *   <li>SYNC_AND_FORWARD: Transitional mode where new mutations are 
written directly to the
-     *   standby cluster while concurrently draining the local queue of 
previously stored
-     *   mutations.</li>
-     * </ul>
-     * <p>
-     * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
-     * and the state of the local mutation queue.
-     */
-    protected enum ReplicationMode {
-        /**
-         * Normal operation where mutations are written directly to the 
standby cluster's HDFS.
-         * This is the default and primary mode of operation.
-         */
-        SYNC,
-
-        /**
-         * Fallback mode when the standby cluster's HDFS is unavailable. 
Mutations are stored
-         * locally and will be forwarded when connectivity is restored.
-         */
-        STORE_AND_FORWARD,
-
-        /**
-         * Transitional mode where new mutations are written directly to the 
standby cluster
-         * while concurrently draining the local queue of previously stored 
mutations. This mode
-         * is entered when connectivity to the standby cluster is restored 
while there are still
-         * mutations in the local queue.
-         */
-        SYNC_AND_FORWARD;
-    }
+    protected volatile boolean closed = false;
 
     /** The reason for requesting a log rotation. */
     protected enum RotationReason {
@@ -275,88 +136,31 @@ public class ReplicationLog {
         ERROR;
     }
 
-    /**
-     * The current replication mode. Always SYNC for now.
-     * <p>TODO: Implement mode transitions to STORE_AND_FORWARD when standby 
becomes unavailable.
-     * <p>TODO: Implement mode transitions to SYNC_AND_FORWARD when draining 
queue.
-     */
-    protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
-
-    // TODO: Add configuration keys for store-and-forward behavior
-    // - Maximum retry attempts before switching to store-and-forward
-    // - Retry delay between attempts
-    // - Queue drain batch size
-    // - Queue drain interval
-
-    // TODO: Add state tracking fields
-    // - Queue of pending changes when in store-and-forward mode
-    // - Timestamp of last successful standby write
-    // - Error count for tracking consecutive failures
-
-    // TODO: Add methods for state transitions
-    // - switchToStoreAndForward() - Called when standby becomes unavailable
-    // - switchToSync() - Called when standby becomes available again
-    // - drainQueue() - Background task to process queued changes
-
-    // TODO: Enhance error handling in LogEventHandler
-    // - Track consecutive failures
-    // - Switch to store-and-forward after max retries
-
-    // TODO: Implement queue management for store-and-forward mode
-    // - Implement queue persistence to handle RegionServer restarts
-    // - Implement queue draining when in SYNC_AND_FORWARD state
-    // - Implement automatic recovery from temporary network issues
-    // - Add configurable thresholds for switching to store-and-forward based 
on write latency
-    // - Add circuit breaker pattern to prevent overwhelming the standby 
cluster
-    // - Add queue size limits and backpressure mechanisms
-    // - Add queue metrics for monitoring (queue size, oldest entry age, etc.)
-
-    // TODO: Enhance metrics for replication health monitoring
-    // - Add metrics for replication lag between active and standby
-    // - Track time spent in each replication mode (SYNC, STORE_AND_FORWARD, 
SYNC_AND_FORWARD)
-    // - Monitor queue drain rate and backlog size
-    // - Track consecutive failures and mode transition events
-
-    /**
-     * Gets the singleton instance of the ReplicationLogManager using the lazy 
initializer pattern.
-     * Initializes the instance if it hasn't been created yet.
-     * @param conf Configuration object.
-     * @param serverName The server name.
-     * @return The singleton ReplicationLogManager instance.
-     * @throws IOException If initialization fails.
-     */
-    public static ReplicationLog get(Configuration conf, ServerName serverName)
-          throws IOException {
-        if (instance == null) {
-            synchronized (ReplicationLog.class) {
-                if (instance == null) {
-                    // Complete initialization before assignment
-                    ReplicationLog logManager = new ReplicationLog(conf, 
serverName);
-                    logManager.init();
-                    instance = logManager;
-                }
-            }
-        }
-        return instance;
-    }
-
-    protected ReplicationLog(Configuration conf, ServerName serverName) {
-        this.conf = conf;
-        this.serverName = serverName;
-        this.rotationTimeMs = 
conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY,
-            DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
-        long rotationSize = 
conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
-            DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
-        double rotationSizePercent = 
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
-            DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+    protected static final byte EVENT_TYPE_DATA = 0;
+    protected static final byte EVENT_TYPE_SYNC = 1;
+
+    protected ReplicationLogGroupWriter(ReplicationLogGroup logGroup) {
+        this.logGroup = logGroup;
+        Configuration conf = logGroup.getConfiguration();
+        this.rotationTimeMs =
+            
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
+                ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+        long rotationSize =
+            
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+                
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+        double rotationSizePercent =
+            
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+                
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
         this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
-        this.maxRotationRetries = 
conf.getInt(REPLICATION_LOG_ROTATION_RETRIES_KEY,
-            DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
-        this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, 
DEFAULT_REPLICATION_NUM_SHARDS);
-        String compressionName = 
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
-            DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+        this.maxRotationRetries =
+            
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
+                ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
+        String compressionName =
+            
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+                
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
         Compression.Algorithm compression = Compression.Algorithm.NONE;
-        if 
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
 {
+        if (!compressionName.equals(
+                
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM)) {
             try {
                 compression = 
Compression.getCompressionAlgorithmByName(compressionName);
             } catch (IllegalArgumentException e) {
@@ -364,51 +168,21 @@ public class ReplicationLog {
             }
         }
         this.compression = compression;
-        this.ringBufferSize = conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
-            DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
-        this.syncTimeoutMs = conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY,
-            DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
-        this.metrics = createMetricsSource();
+        this.ringBufferSize = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
+            ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+        this.syncTimeoutMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
+            ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
     }
 
-    /** Creates a new metrics source for monitoring replication log 
operations. */
-    protected MetricsReplicationLogSource createMetricsSource() {
-        return new MetricsReplicationLogSourceImpl();
-    }
-
-    /** Returns the metrics source for monitoring replication log operations. 
*/
-    public MetricsReplicationLogSource getMetrics() {
-        return metrics;
-    }
-
-    @SuppressWarnings("unchecked")
+    /** Initialize the writer. */
     public void init() throws IOException {
-        if (numShards > MAX_REPLICATION_NUM_SHARDS) {
-            throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " 
is " + numShards
-                + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
-        }
         initializeFileSystems();
         // Start time based rotation.
         lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
         startRotationExecutor();
-        // Create the initial writer. Do this before we call 
LogEventHandler.init().
-        currentWriter = createNewWriter(standbyFs, standbyUrl);
-        // Initialize the Disruptor. We use ProducerType.MULTI because 
multiple handlers might
-        // call append concurrently. We use YieldingWaitStrategy for low 
latency. When the ring
-        // buffer is full (controlled by 
REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), producers
-        // calling ringBuffer.next() will effectively block (by 
yielding/spinning), creating
-        // backpressure on the callers. This ensures appends don't proceed 
until there is space.
-        disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
-            new 
ThreadFactoryBuilder().setNameFormat("ReplicationLogEventHandler-%d")
-                .setDaemon(true).build(),
-            ProducerType.MULTI, new YieldingWaitStrategy());
-        LogEventHandler eventHandler = new LogEventHandler();
-        eventHandler.init();
-        disruptor.handleEventsWith(eventHandler);
-        LogExceptionHandler exceptionHandler = new LogExceptionHandler();
-        disruptor.setDefaultExceptionHandler(exceptionHandler);
-        ringBuffer = disruptor.start();
-        LOG.info("ReplicationLogWriter started with ring buffer size {}", 
ringBufferSize);
+        // Create the initial writer. Do this before we initialize the 
Disruptor.
+        currentWriter = createNewWriter();
+        initializeDisruptor();
     }
 
     /**
@@ -431,10 +205,9 @@ public class ReplicationLog {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
         }
-        if (isClosed) {
+        if (closed) {
             throw new IOException("Closed");
         }
-        long startTime = System.nanoTime();
         // ringBuffer.next() claims the next sequence number. Because we 
initialize the Disruptor
         // with ProducerType.MULTI and the blocking YieldingWaitStrategy this 
call WILL BLOCK if
         // the ring buffer is full, thus providing backpressure to the callers.
@@ -442,7 +215,6 @@ public class ReplicationLog {
         try {
             LogEvent event = ringBuffer.get(sequence);
             event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, 
mutation), null);
-            metrics.updateAppendTime(System.nanoTime() - startTime);
         } finally {
             // Update ring buffer events metric
             ringBuffer.publish(sequence);
@@ -465,18 +237,44 @@ public class ReplicationLog {
      * @throws IOException If the sync operation fails after retries, or if 
interrupted.
      */
     public void sync() throws IOException {
-        if (isClosed) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Sync");
+        }
+        if (closed) {
             throw new IOException("Closed");
         }
         syncInternal();
     }
 
+    /** Initialize file systems needed by this writer implementation. */
+    protected abstract void initializeFileSystems() throws IOException;
+
+    /**
+     * Create a new log writer for rotation.
+     */
+    protected abstract LogFileWriter createNewWriter() throws IOException;
+
+    /** Initialize the Disruptor. */
+    @SuppressWarnings("unchecked")
+    protected void initializeDisruptor() throws IOException {
+        disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
+            new ThreadFactoryBuilder()
+                .setNameFormat("ReplicationLogGroupWriter-" + 
logGroup.getHaGroupName() + "-%d")
+                .setDaemon(true).build(),
+            ProducerType.MULTI, new YieldingWaitStrategy());
+        LogEventHandler eventHandler = new LogEventHandler();
+        eventHandler.init();
+        disruptor.handleEventsWith(eventHandler);
+        LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+        disruptor.setDefaultExceptionHandler(exceptionHandler);
+        ringBuffer = disruptor.start();
+    }
+
     /**
      * Internal implementation of sync that publishes a sync event to the ring 
buffer and waits
      * for completion.
      */
     protected void syncInternal() throws IOException {
-        long startTime = System.nanoTime();
         CompletableFuture<Void> syncFuture = new CompletableFuture<>();
         long sequence = ringBuffer.next();
         try {
@@ -489,10 +287,7 @@ public class ReplicationLog {
         try {
             // Wait for the event handler to process up to and including this 
sync event
             syncFuture.get(syncTimeoutMs, TimeUnit.MILLISECONDS);
-            metrics.updateSyncTime(System.nanoTime() - startTime);
         } catch (InterruptedException e) {
-            // Almost certainly the regionserver is shutting down or aborting.
-            // TODO: Do we need to do more here?
             Thread.currentThread().interrupt();
             throw new InterruptedIOException("Interrupted while waiting for 
sync");
         } catch (ExecutionException e) {
@@ -509,90 +304,33 @@ public class ReplicationLog {
         }
     }
 
-    /** Initializes the standby and fallback filesystems and creates their log 
directories. */
-    protected void initializeFileSystems() throws IOException {
-        String standbyUrlString = conf.get(REPLICATION_STANDBY_HDFS_URL_KEY);
-        if (standbyUrlString == null) {
-            throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
configured");
-        }
-        // Only validate that the URI is well formed. We should not assume the 
scheme must be
-        // "hdfs" because perhaps the operator will substitute another 
FileSystem implementation
-        // for DistributedFileSystem.
-        try {
-            this.standbyUrl = new URI(standbyUrlString);
-        } catch (URISyntaxException e) {
-            throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
valid", e);
-        }
-        String fallbackUrlString = conf.get(REPLICATION_FALLBACK_HDFS_URL_KEY);
-        if (fallbackUrlString != null) {
-            // Only validate that the URI is well formed, as above.
-            try {
-                this.fallbackUrl = new URI(fallbackUrlString);
-            } catch (URISyntaxException e) {
-                throw new IOException(REPLICATION_FALLBACK_HDFS_URL_KEY + " is 
not valid", e);
-            }
-            this.fallbackFs = getFileSystem(fallbackUrl);
-            Path fallbackLogDir = new Path(fallbackUrl.getPath());
-            if (!fallbackFs.exists(fallbackLogDir)) {
-                LOG.info("Creating directory {}", fallbackUrlString);
-                if (!this.fallbackFs.mkdirs(fallbackLogDir)) {
-                    throw new IOException("Failed to create directory: " + 
fallbackUrlString);
-                }
-            }
-        } else {
-            // We support a synchronous replication only option if 
store-and-forward configuration
-            // keys are missing. This is outside the scope of the design spec 
but potentially
-            // useful for testing and also allows an operator to prefer 
failover consistency and
-            // simplicity over availability, even if that is not recommended. 
Log it at WARN level
-            // to focus appropriate attention. (Should it be ERROR?)
-            LOG.warn("Fallback not configured ({}), store-and-forward 
DISABLED.",
-                REPLICATION_FALLBACK_HDFS_URL_KEY);
-            this.fallbackFs = null;
-        }
-        // Configuration is sorted, and possibly store-and-forward directories 
have been created,
-        // now create the standby side directories as needed.
-        this.standbyFs = getFileSystem(standbyUrl);
-        Path standbyLogDir = new Path(standbyUrl.getPath());
-        if (!standbyFs.exists(standbyLogDir)) {
-            LOG.info("Creating directory {}", standbyUrlString);
-            if (!standbyFs.mkdirs(standbyLogDir)) {
-                throw new IOException("Failed to create directory: " + 
standbyUrlString);
-            }
-        }
-    }
-
-    /** Gets a FileSystem instance for the given URI using the current 
configuration. */
-    protected FileSystem getFileSystem(URI uri) throws IOException {
-        return FileSystem.get(uri, conf);
+    protected void startRotationExecutor() {
+        long rotationCheckInterval = getRotationCheckInterval(rotationTimeMs);
+        rotationExecutor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setNameFormat("ReplicationLogRotation-" + 
logGroup.getHaGroupName() + "-%d")
+                .setDaemon(true).build());
+        rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), 
rotationCheckInterval,
+            rotationCheckInterval, TimeUnit.MILLISECONDS);
+        LOG.debug("Started rotation executor with interval {}ms", 
rotationCheckInterval);
     }
 
-    /** Calculates the interval for checking log rotation based on the 
configured rotation time. */
     protected long getRotationCheckInterval(long rotationTimeMs) {
-        long interval;
-        if (rotationTimeMs > 0) {
-            interval = rotationTimeMs / 4;
-        } else {
-            // If rotation time is not configured or invalid, use a sensible 
default like 10 seconds
-            interval = 10000L;
-        }
+        long interval = Math.max(10 * 1000L, Math.min(60 * 1000L, 
rotationTimeMs / 10));
         return interval;
     }
 
-    /** Starts the background task for time-based log rotation. */
-    protected void startRotationExecutor() {
-        Preconditions.checkState(rotationExecutor == null, "Rotation executor 
already started");
-        rotationExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
-            
.setNameFormat("ReplicationLogRotator-%d").setDaemon(true).build());
-        long interval = getRotationCheckInterval(rotationTimeMs);
-        rotationExecutor.scheduleWithFixedDelay(new LogRotationTask(), 
interval, interval,
-            TimeUnit.MILLISECONDS);
-    }
-
-    /** Stops the background task for time-based log rotation. */
     protected void stopRotationExecutor() {
         if (rotationExecutor != null) {
-            rotationExecutor.shutdownNow();
-            rotationExecutor = null;
+            rotationExecutor.shutdown();
+            try {
+                if (!rotationExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                    rotationExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                rotationExecutor.shutdownNow();
+            }
         }
     }
 
@@ -669,7 +407,7 @@ public class ReplicationLog {
         try {
             // Try to get the new writer first. If it fails we continue using 
the current writer.
             // Increment the writer generation
-            LogFileWriter newWriter = createNewWriter(standbyFs, standbyUrl);
+            LogFileWriter newWriter = createNewWriter();
             LOG.debug("Created new writer: {}", newWriter);
             // Close the current writer
             if (currentWriter != null) {
@@ -679,23 +417,23 @@ public class ReplicationLog {
             currentWriter = newWriter;
             lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
             rotationFailures.set(0);
-            metrics.incrementRotationCount();
+            logGroup.getMetrics().incrementRotationCount();
             switch (reason) {
             case TIME:
-                metrics.incrementTimeBasedRotationCount();
+                logGroup.getMetrics().incrementTimeBasedRotationCount();
                 break;
             case SIZE:
-                metrics.incrementSizeBasedRotationCount();
+                logGroup.getMetrics().incrementSizeBasedRotationCount();
                 break;
             case ERROR:
-                metrics.incrementErrorBasedRotationCount();
+                logGroup.getMetrics().incrementErrorBasedRotationCount();
                 break;
             }
         } catch (IOException e) {
             // If we fail to rotate the log, we increment the failure counter. 
If we have exceeded
             // the maximum number of retries, we close the log and throw the 
exception. Otherwise
             // we log a warning and continue.
-            metrics.incrementRotationFailureCount();
+            logGroup.getMetrics().incrementRotationFailureCount();
             long numFailures = rotationFailures.getAndIncrement();
             if (numFailures >= maxRotationRetries) {
                 LOG.warn("Failed to rotate log (attempt {}/{}), closing log", 
numFailures,
@@ -711,57 +449,6 @@ public class ReplicationLog {
         return currentWriter;
     }
 
-    /**
-     * Creates a new log file path in a sharded directory structure based on 
server name and
-     * timestamp.
-     */
-    protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
-        long timestamp = EnvironmentEdgeManager.currentTimeMillis();
-        // To have all logs for a given regionserver appear in the same shard, 
hash only the
-        // serverName. However we expect some regionservers will have 
significantly more load than
-        // others so we instead distribute the logs over all of the shards 
randomly for a more even
-        // overall distribution by also hashing the timestamp.
-        int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % 
numShards;
-        Path shardPath = new Path(url.getPath(), 
String.format(SHARD_DIR_FORMAT, shard));
-        // Ensure the shard directory exists. We track which shard directories 
we have probed or
-        // created to avoid a round trip to the namenode for repeats.
-        IOException[] exception = new IOException[1];
-        shardMap.computeIfAbsent(shardPath, p -> {
-            try {
-                if (!fs.exists(p)) {
-                    if (!fs.mkdirs(p)) {
-                        throw new IOException("Could not create path: " + p);
-                    }
-                }
-            } catch (IOException e) {
-                exception[0] = e;
-            }
-            return p;
-        });
-        // If we faced an exception in computeIfAbsent, throw it
-        if (exception[0] != null) {
-            throw exception[0];
-        }
-        Path filePath = new Path(shardPath, String.format(FILE_NAME_FORMAT, 
timestamp, serverName));
-        return filePath;
-    }
-
-    /** Creates and initializes a new LogFileWriter for the given filesystem 
and URL. */
-    protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws 
IOException {
-        Path filePath = makeWriterPath(fs, url);
-        LogFileWriterContext writerContext = new 
LogFileWriterContext(conf).setFileSystem(fs)
-            .setFilePath(filePath).setCompression(compression);
-        LogFileWriter newWriter = new LogFileWriter();
-        try {
-            newWriter.init(writerContext);
-            newWriter.setGeneration(writerGeneration.incrementAndGet());
-        } catch (IOException e) {
-            LOG.error("Failed to initialize new LogFileWriter for path {}", 
filePath, e);
-            throw e;
-        }
-        return newWriter;
-    }
-
     /** Closes the given writer, logging any errors that occur during close. */
     protected void closeWriter(LogFileWriter writer) {
         if (writer == null) {
@@ -775,6 +462,15 @@ public class ReplicationLog {
         }
     }
 
+    /**
+     * Check if this ReplicationLogGroup is closed.
+     *
+     * @return true if closed, false otherwise
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
     /**
      * Force closes the log upon an unrecoverable internal error. This is a 
fail-stop behavior:
      * once called, the log is marked as closed, the Disruptor is halted, and 
all subsequent
@@ -784,10 +480,10 @@ public class ReplicationLog {
     protected void closeOnError() {
         lock.lock();
         try {
-            if (isClosed) {
+            if (closed) {
                 return;
             }
-            isClosed = true;
+            closed = true;
         } finally {
             lock.unlock();
         }
@@ -804,10 +500,10 @@ public class ReplicationLog {
     public void close() {
         lock.lock();
         try {
-            if (isClosed) {
+            if (closed) {
                 return;
             }
-            isClosed = true;
+            closed = true;
         } finally {
             lock.unlock();
         }
@@ -825,11 +521,15 @@ public class ReplicationLog {
         closeWriter(currentWriter);
     }
 
+    protected FileSystem getFileSystem(URI uri) throws IOException {
+        return FileSystem.get(uri, logGroup.getConfiguration());
+    }
+
     /** Implements time based rotation independent of in-line checking. */
     protected class LogRotationTask implements Runnable {
         @Override
         public void run() {
-            if (isClosed) {
+            if (closed) {
                 return;
             }
             // Use tryLock with a timeout to avoid blocking indefinitely if 
another thread holds
@@ -842,7 +542,7 @@ public class ReplicationLog {
                     // Check only the time condition here, size is handled by 
getWriter
                     long now = EnvironmentEdgeManager.currentTimeMillis();
                     long last = lastRotationTime.get();
-                    if (!isClosed && now - last >= rotationTimeMs) {
+                    if (!closed && now - last >= rotationTimeMs) {
                         LOG.debug("Time based rotation needed ({} ms elapsed, 
threshold {} ms).",
                               now - last, rotationTimeMs);
                         try {
@@ -901,7 +601,7 @@ public class ReplicationLog {
      * Handles events from the Disruptor, managing batching, writer rotation, 
and error handling.
      */
     protected class LogEventHandler implements EventHandler<LogEvent> {
-        protected final int maxRetries; // Configurable max retries for sync
+        protected final int maxRetries;    // Configurable max retries for sync
         protected final long retryDelayMs; // Configurable delay between 
retries
         protected final List<Record> currentBatch = new ArrayList<>();
         protected final List<CompletableFuture<Void>> pendingSyncFutures = new 
ArrayList<>();
@@ -909,10 +609,12 @@ public class ReplicationLog {
         protected long generation;
 
         protected LogEventHandler() {
-            this.maxRetries = conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY,
-                DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
-            this.retryDelayMs = 
conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY,
-                DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
+            Configuration conf = logGroup.getConfiguration();
+            this.maxRetries = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_SYNC_RETRIES_KEY,
+                ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
+            this.retryDelayMs =
+                
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
+                    
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
         }
 
         protected void init() throws IOException {
@@ -1007,7 +709,7 @@ public class ReplicationLog {
             // Calculate time spent in ring buffer
             long currentTimeNs = System.nanoTime();
             long ringBufferTimeNs = currentTimeNs - event.timestampNs;
-            metrics.updateRingBufferTime(ringBufferTimeNs);
+            logGroup.getMetrics().updateRingBufferTime(ringBufferTimeNs);
             writer = getWriter();
             int attempt = 0;
             while (attempt < maxRetries) {
@@ -1093,5 +795,4 @@ public class ReplicationLog {
             closeOnError();
         }
     }
-
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
new file mode 100644
index 0000000000..d9520e27fc
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class implements synchronous replication to a standby cluster's HDFS. 
It writes replication
+ * logs directly to the standby cluster in synchronous mode, providing 
immediate consistency for
+ * failover scenarios.
+ */
+public class StandbyLogGroupWriter extends ReplicationLogGroupWriter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StandbyLogGroupWriter.class);
+
+    private FileSystem standbyFs;
+    private URI standbyUrl;
+    protected int numShards;
+    protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for StandbyLogGroupWriter.
+     */
+    public StandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+        super(logGroup);
+        Configuration conf = logGroup.getConfiguration();
+        this.numShards = 
conf.getInt(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY,
+            ReplicationLogGroup.DEFAULT_REPLICATION_NUM_SHARDS);
+        LOG.debug("Created StandbyLogGroupWriter for HA Group: {}", 
logGroup.getHaGroupName());
+    }
+
+    @Override
+    protected void initializeFileSystems() throws IOException {
+        if (numShards > ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS) {
+            throw new 
IllegalArgumentException(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY
+                + " is " + numShards + ", but the limit is "
+                + ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS);
+        }
+        Configuration conf = logGroup.getConfiguration();
+        String standbyUrlString = 
conf.get(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+        if (standbyUrlString == null || standbyUrlString.trim().isEmpty()) {
+            throw new IOException("Standby HDFS URL not configured: "
+                + ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+        }
+        try {
+            standbyUrl = new URI(standbyUrlString);
+            standbyFs = getFileSystem(standbyUrl);
+            LOG.info("Initialized standby filesystem: {}", standbyUrl);
+        } catch (URISyntaxException e) {
+            throw new IOException("Invalid standby HDFS URL: " + 
standbyUrlString, e);
+        }
+    }
+
+    /**
+     * Creates a new log file path in a sharded directory structure based on 
server name and
+     * timestamp. The resulting path structure is
+     * <pre>
+     * [url]/[haGroupId]/[shard]/[timestamp]-[servername].plog
+     * </pre>
+     */
+    protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+        Path haGroupPath = new Path(url.getPath(), logGroup.getHaGroupName());
+        long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+        // To have all logs for a given regionserver appear in the same shard, 
hash only the
+        // serverName. However we expect some regionservers will have 
significantly more load than
+        // others so we instead distribute the logs over all of the shards 
randomly for a more even
+        // overall distribution by also hashing the timestamp.
+        int shard = Math.floorMod(logGroup.getServerName().hashCode() ^ 
Long.hashCode(timestamp),
+            numShards);
+        Path shardPath = new Path(haGroupPath,
+            String.format(ReplicationLogGroup.SHARD_DIR_FORMAT, shard));
+        // Ensure the shard directory exists. We track which shard directories 
we have probed or
+        // created to avoid a round trip to the namenode for repeats.
+        IOException[] exception = new IOException[1];
+        shardMap.computeIfAbsent(shardPath, p -> {
+            try {
+                if (!fs.exists(p)) {
+                    fs.mkdirs(haGroupPath); // This probably exists, but just 
in case.
+                    if (!fs.mkdirs(shardPath)) {
+                        throw new IOException("Could not create path: " + p);
+                    }
+                }
+            } catch (IOException e) {
+                exception[0] = e;
+                return null; // Don't cache the path if we can't create it.
+            }
+            return p;
+        });
+        // If we faced an exception in computeIfAbsent, throw it
+        if (exception[0] != null) {
+            throw exception[0];
+        }
+        Path filePath = new Path(shardPath, 
String.format(ReplicationLogGroup.FILE_NAME_FORMAT,
+            timestamp, logGroup.getServerName()));
+        return filePath;
+    }
+
+    /** Creates and initializes a new LogFileWriter. */
+    protected LogFileWriter createNewWriter() throws IOException {
+        Path filePath = makeWriterPath(standbyFs, standbyUrl);
+        LogFileWriterContext writerContext = new 
LogFileWriterContext(logGroup.getConfiguration())
+            .setFileSystem(standbyFs)
+            .setFilePath(filePath).setCompression(compression);
+        LogFileWriter newWriter = new LogFileWriter();
+        newWriter.init(writerContext);
+        newWriter.setGeneration(writerGeneration.incrementAndGet());
+        return newWriter;
+    }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
new file mode 100644
index 0000000000..462ab6d8a6
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store-and-forward replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class is a stub implementation for future store-and-forward 
replication functionality.
+ * Store-and-forward mode is used when the standby cluster is temporarily 
unavailable - mutations
+ * are stored locally and forwarded when connectivity is restored.
+ * <p>
+ * Currently this is a stub that throws UnsupportedOperationException for the 
abstract methods.
+ * Future implementation will include:
+ * <ul>
+ *   <li>Local storage of mutations when standby is unavailable</li>
+ *   <li>Background forwarding when connectivity is restored</li>
+ *   <li>Proper error handling and retry logic</li>
+ *   <li>Integration with HA state management</li>
+ *   <li>Dual-mode operation: local storage + forwarding</li>
+ * </ul>
+ */
+public class StoreAndForwardLogGroupWriter extends ReplicationLogGroupWriter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreAndForwardLogGroupWriter.class);
+
+    /**
+     * Constructor for StoreAndForwardLogGroupWriter.
+     */
+    public StoreAndForwardLogGroupWriter(ReplicationLogGroup logGroup) {
+        super(logGroup);
+        LOG.debug("Created StoreAndForwardLogGroupWriter for HA Group: {}",
+            logGroup.getHaGroupName());
+    }
+
+    @Override
+    public void init() throws IOException {
+        // TODO
+    }
+
+    @Override
+    public void close() {
+        // TODO
+    }
+
+    @Override
+    protected void initializeFileSystems() throws IOException {
+        // TODO
+    }
+
+    @Override
+    protected LogFileWriter createNewWriter() throws IOException {
+        // TODO
+        return null;
+    }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
similarity index 91%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index d2ff4a497f..95297ea34d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -19,12 +19,12 @@ package org.apache.phoenix.replication.metrics;
 
 import org.apache.hadoop.hbase.metrics.BaseSource;
 
-/** Interface for metrics related to ReplicationLog operations. */
-public interface MetricsReplicationLogSource extends BaseSource {
+/** Interface for metrics related to ReplicationLogGroup operations. */
+public interface MetricsReplicationLogGroupSource extends BaseSource {
 
-    String METRICS_NAME = "ReplicationLog";
+    String METRICS_NAME = "ReplicationLogGroup";
     String METRICS_CONTEXT = "phoenix";
-    String METRICS_DESCRIPTION = "Metrics about Phoenix Replication Log 
Operations";
+    String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for 
an HA Group";
     String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
 
     String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
@@ -109,7 +109,11 @@ public interface MetricsReplicationLogSource extends 
BaseSource {
      */
     void incrementRotationFailureCount();
 
+    /**
+     * Unregister this metrics source.
+     */
+    void close();
+
     // Get current values for testing
     ReplicationLogMetricValues getCurrentMetricValues();
-
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
similarity index 87%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 5c6f4754ac..9ad902d4a5 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -18,12 +18,13 @@
 package org.apache.phoenix.replication.metrics;
 
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 
 /** Implementation of metrics source for ReplicationLog operations. */
-public class MetricsReplicationLogSourceImpl extends BaseSourceImpl
-        implements MetricsReplicationLogSource {
+public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
+        implements MetricsReplicationLogGroupSource {
 
     private final MutableFastCounter timeBasedRotationCount;
     private final MutableFastCounter sizeBasedRotationCount;
@@ -35,13 +36,14 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
     private final MutableHistogram rotationTime;
     private final MutableHistogram ringBufferTime;
 
-    public MetricsReplicationLogSourceImpl() {
-        this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT);
+    public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
+        this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT, haGroupName);
     }
 
-    public MetricsReplicationLogSourceImpl(String metricsName, String 
metricsDescription,
-        String metricsContext, String metricsJmxContext) {
-        super(metricsName, metricsDescription, metricsContext, 
metricsJmxContext);
+    public MetricsReplicationLogGroupSourceImpl(String metricsName, String 
metricsDescription,
+        String metricsContext, String metricsJmxContext, String haGroupName) {
+        super(metricsName, metricsDescription, metricsContext, 
metricsJmxContext + ",haGroup="
+            + haGroupName);
         timeBasedRotationCount = 
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
             TIME_BASED_ROTATION_COUNT_DESC, 0L);
         sizeBasedRotationCount = 
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
@@ -58,6 +60,11 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
             RING_BUFFER_TIME_DESC);
     }
 
+    @Override
+    public void close() {
+        DefaultMetricsSystem.instance().unregisterSource(metricsJmxContext);
+    }
+
     @Override
     public void incrementTimeBasedRotationCount() {
         timeBasedRotationCount.incr();
@@ -132,10 +139,4 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
     public String getMetricsContext() {
         return METRICS_CONTEXT;
     }
-
-    @Override
-    public String getMetricsJmxContext() {
-        return METRICS_JMX_CONTEXT;
-    }
-
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
similarity index 72%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
rename to 
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 498ba6338f..437f946647 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -44,25 +44,14 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.phoenix.replication.ReplicationLog.RotationReason;
-import org.apache.phoenix.replication.log.LogFile;
-import org.apache.phoenix.replication.log.LogFileReader;
-import org.apache.phoenix.replication.log.LogFileReaderContext;
-import org.apache.phoenix.replication.log.LogFileTestUtil;
 import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.junit.After;
 import org.junit.Before;
@@ -75,10 +64,16 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.ReplicationLogGroupWriter.RotationReason;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
 
-public class ReplicationLogTest {
+public class ReplicationLogGroupTest {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupTest.class);
 
     @ClassRule
     public static TemporaryFolder testFolder = new TemporaryFolder();
@@ -87,7 +82,7 @@ public class ReplicationLogTest {
     private ServerName serverName;
     private FileSystem localFs;
     private URI standbyUri;
-    private ReplicationLog logWriter;
+    private ReplicationLogGroup logGroup;
 
     static final int TEST_RINGBUFFER_SIZE = 32;
     static final int TEST_SYNC_TIMEOUT = 1000;
@@ -100,30 +95,26 @@ public class ReplicationLogTest {
         localFs = FileSystem.getLocal(conf);
         standbyUri = new Path(testFolder.toString()).toUri();
         serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
-        conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
+        conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
         // Small ring buffer size for testing
-        conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
+        conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
         // Set a short sync timeout for testing
-        conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
+        conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
         // Set rotation time to 10 seconds
-        conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
+        conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
         // Small size threshold for testing
-        conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+        
conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
             TEST_ROTATION_SIZE_BYTES);
 
-        logWriter = spy(new TestableReplicationLog(conf, serverName));
-        logWriter.init();
+        logGroup = new TestableLogGroup(conf, serverName, "testHAGroup");
+        logGroup.init();
     }
 
     @After
     public void tearDown() throws Exception {
-        if (logWriter != null) {
-            logWriter.close();
+        if (logGroup != null) {
+            logGroup.close();
         }
-        // Deregister the metrics source that the replication log registers 
during initialization
-        // so the next unit will be able to register it again and successfully 
initialize.
-        DefaultMetricsSystem.instance()
-            .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
     }
 
     /**
@@ -146,17 +137,17 @@ public class ReplicationLogTest {
         final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
 
         // Get the inner writer
-        LogFileWriter writer = logWriter.getWriter();
+        LogFileWriter writer = logGroup.getActiveWriter().getWriter();
         assertNotNull("Writer should not be null", writer);
         InOrder inOrder = Mockito.inOrder(writer);
 
-        logWriter.append(tableName, commitId1, put1);
-        logWriter.append(tableName, commitId2, put2);
-        logWriter.append(tableName, commitId3, put3);
-        logWriter.append(tableName, commitId4, put4);
-        logWriter.append(tableName, commitId5, put5);
+        logGroup.append(tableName, commitId1, put1);
+        logGroup.append(tableName, commitId2, put2);
+        logGroup.append(tableName, commitId3, put3);
+        logGroup.append(tableName, commitId4, put4);
+        logGroup.append(tableName, commitId5, put5);
 
-        logWriter.sync();
+        logGroup.sync();
 
         // Happens-before ordering verification, using Mockito's inOrder. 
Verify that the appends
         // happen before sync, and sync happened after appends.
@@ -168,41 +159,6 @@ public class ReplicationLogTest {
         inOrder.verify(writer, times(1)).sync();
     }
 
-    /**
-     * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
-     * append failures by rolling to a new writer and retrying the operation.
-     */
-    @Test
-    public void testAppendFailureAndRetry() throws Exception {
-        final String tableName = "TBLAFR";
-        final long commitId = 1L;
-        final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-        // Get the inner writer
-        LogFileWriter writerBeforeRoll = logWriter.getWriter();
-        assertNotNull("Initial writer should not be null", writerBeforeRoll);
-
-        // Configure writerBeforeRoll to fail on the first append call
-        doThrow(new IOException("Simulated append failure"))
-            .when(writerBeforeRoll).append(anyString(), anyLong(), 
any(Mutation.class));
-
-        // Append data
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
-
-        // Get the inner writer we rolled to.
-        LogFileWriter writerAfterRoll = logWriter.getWriter();
-        assertNotNull("Rolled writer should not be null", writerAfterRoll);
-
-        // Verify the sequence: append (fail), rotate, append (succeed), sync
-        InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
-        inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
-        inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed 
append, did not try
-        inOrder.verify(writerAfterRoll, times(1))
-            .append(eq(tableName), eq(commitId), eq(put)); // Retry
-        inOrder.verify(writerAfterRoll, times(1)).sync();
-    }
-
     /**
      * Tests the behavior when a sync operation fails. Verifies that the 
system properly handles
      * sync failures by rolling to a new writer and retrying the operation.
@@ -214,18 +170,18 @@ public class ReplicationLogTest {
       final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
       // Get the inner writer
-      LogFileWriter writerBeforeRoll = logWriter.getWriter();
+      LogFileWriter writerBeforeRoll = logGroup.getActiveWriter().getWriter();
       assertNotNull("Initial writer should not be null", writerBeforeRoll);
 
       // Configure writerBeforeRoll to fail on the first sync call
       doThrow(new IOException("Simulated sync 
failure")).when(writerBeforeRoll).sync();
 
       // Append data
-      logWriter.append(tableName, commitId, put);
-      logWriter.sync();
+      logGroup.append(tableName, commitId, put);
+      logGroup.sync();
 
       // Get the inner writer we rolled to.
-      LogFileWriter writerAfterRoll = logWriter.getWriter();
+      LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
       assertNotNull("Initial writer should not be null", writerBeforeRoll);
 
       // Verify the sequence: append, sync (fail), rotate, append (retry), 
sync (succeed)
@@ -248,7 +204,7 @@ public class ReplicationLogTest {
         long commitId = 0;
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Inner writer should not be null", innerWriter);
 
         // Create a slow consumer to fill up the ring buffer.
@@ -262,7 +218,7 @@ public class ReplicationLogTest {
 
         // Fill up the ring buffer by sending enough events.
         for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
-            logWriter.append(tableName, commitId++, put);
+            logGroup.append(tableName, commitId++, put);
         }
 
         // Now try to append when the ring is full. This should block until 
space becomes
@@ -273,7 +229,7 @@ public class ReplicationLogTest {
         Thread appendThread = new Thread(() -> {
             try {
                 startFuture.complete(null);
-                logWriter.append(tableName, myCommitId, put);
+                logGroup.append(tableName, myCommitId, put);
                 appendFuture.complete(null);
             } catch (IOException e) {
                 appendFuture.completeExceptionally(e);
@@ -298,6 +254,41 @@ public class ReplicationLogTest {
         verify(innerWriter, timeout(10000).times(1)).append(eq(tableName), 
eq(myCommitId), any());
     }
 
+    /**
+     * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
+     * append failures by rolling to a new writer and retrying the operation.
+     */
+    @Test
+    public void testAppendFailureAndRetry() throws Exception {
+        final String tableName = "TBLAFR";
+        final long commitId = 1L;
+        final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+        // Get the inner writer
+        LogFileWriter writerBeforeRoll = 
logGroup.getActiveWriter().getWriter();
+        assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+        // Configure writerBeforeRoll to fail on the first append call
+        doThrow(new IOException("Simulated append failure"))
+            .when(writerBeforeRoll).append(anyString(), anyLong(), 
any(Mutation.class));
+
+        // Append data
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
+
+        // Get the inner writer we rolled to.
+        LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
+        assertNotNull("Rolled writer should not be null", writerAfterRoll);
+
+        // Verify the sequence: append (fail), rotate, append (succeed), sync
+        InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+        inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+        inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed 
append, did not try
+        inOrder.verify(writerAfterRoll, times(1))
+            .append(eq(tableName), eq(commitId), eq(put)); // Retry
+        inOrder.verify(writerAfterRoll, times(1)).sync();
+    }
+
     /**
      * Tests the sync timeout behavior. Verifies that sync operations time out 
after the configured
      * interval if they cannot complete.
@@ -309,7 +300,7 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Inner writer should not be null", innerWriter);
 
         doAnswer(new Answer<Object>() {
@@ -322,11 +313,11 @@ public class ReplicationLogTest {
         }).when(innerWriter).sync();
 
         // Append some data
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
 
         // Try to sync and expect it to timeout
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Expected sync to timeout");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -347,7 +338,7 @@ public class ReplicationLogTest {
         final CountDownLatch completionLatch = new CountDownLatch(2);
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Inner writer should not be null", innerWriter);
 
         // Thread 1: Append mutations with even commit IDs
@@ -357,7 +348,7 @@ public class ReplicationLogTest {
                 for (int i = 0; i < APPENDS_PER_THREAD; i++) {
                     final long commitId = i * 2;
                     final Mutation put = LogFileTestUtil.newPut("row" + 
commitId, commitId, 1);
-                    logWriter.append(tableName, commitId, put);
+                    logGroup.append(tableName, commitId, put);
                 }
             } catch (Exception e) {
                 fail("Producer 1 failed: " + e.getMessage());
@@ -373,7 +364,7 @@ public class ReplicationLogTest {
                 for (int i = 0; i < APPENDS_PER_THREAD; i++) {
                     final long commitId = i * 2 + 1;
                     final Mutation put = LogFileTestUtil.newPut("row" + 
commitId, commitId, 1);
-                    logWriter.append(tableName, commitId, put);
+                    logGroup.append(tableName, commitId, put);
                 }
             } catch (Exception e) {
                 fail("Producer 2 failed: " + e.getMessage());
@@ -392,7 +383,7 @@ public class ReplicationLogTest {
 
         // Perform a sync to ensure all appends are processed.
         InOrder inOrder = Mockito.inOrder(innerWriter); // To verify the below 
sync.
-        logWriter.sync();
+        logGroup.sync();
         // Verify the final sync was called.
         inOrder.verify(innerWriter, times(1)).sync();
 
@@ -401,7 +392,6 @@ public class ReplicationLogTest {
             final long commitId = i;
             verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), 
any());
         }
-
     }
 
     /**
@@ -415,22 +405,22 @@ public class ReplicationLogTest {
         final long commitId = 1L;
 
         // Get the initial writer
-        LogFileWriter writerBeforeRotation = logWriter.getWriter();
+        LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("Initial writer should not be null", 
writerBeforeRotation);
 
         // Append some data
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
 
         // Wait for rotation time to elapse
         Thread.sleep((long)(TEST_ROTATION_TIME * 1.25));
 
         // Append more data to trigger rotation check
-        logWriter.append(tableName, commitId + 1, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId + 1, put);
+        logGroup.sync();
 
         // Get the new writer after rotation
-        LogFileWriter writerAfterRotation = logWriter.getWriter();
+        LogFileWriter writerAfterRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("New writer should not be null", writerAfterRotation);
         assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
@@ -456,23 +446,23 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
         long commitId = 1L;
 
-        LogFileWriter writerBeforeRotation = logWriter.getWriter();
+        LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("Initial writer should not be null", 
writerBeforeRotation);
 
         // Append enough data so that we exceed the size threshold.
         for (int i = 0; i < 100; i++) {
-            logWriter.append(tableName, commitId++, put);
+          logGroup.append(tableName, commitId++, put);
         }
-        logWriter.sync(); // Should trigger a sized based rotation
+        logGroup.sync(); // Should trigger a sized based rotation
 
         // Get the new writer after the expected rotation.
-        LogFileWriter writerAfterRotation = logWriter.getWriter();
+        LogFileWriter writerAfterRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("New writer should not be null", writerAfterRotation);
         assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
         // Append one more mutation to verify we're using the new writer.
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
 
         // Verify the sequence of operations
         InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
@@ -499,21 +489,21 @@ public class ReplicationLogTest {
         final long commitId = 1L;
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Inner writer should not be null", innerWriter);
 
         // Append some data
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
 
         // Close the log writer
-        logWriter.close();
+        logGroup.close();
 
         // Verify the inner writer was closed
         verify(innerWriter, times(1)).close();
 
         // Verify we can't append after close
         try {
-            logWriter.append(tableName, commitId + 1, put);
+            logGroup.append(tableName, commitId + 1, put);
             fail("Expected append to fail after close");
         } catch (IOException e) {
             // Expected
@@ -521,14 +511,14 @@ public class ReplicationLogTest {
 
         // Verify we can't sync after close
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Expected sync to fail after close");
         } catch (IOException e) {
             // Expected
         }
 
         // Verify we can close multiple times without error
-        logWriter.close();
+        logGroup.close();
     }
 
     /**
@@ -541,16 +531,16 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
         long commitId = 1L;
 
-        LogFileWriter writerBeforeRotation = logWriter.getWriter();
+        LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("Initial writer should not be null", 
writerBeforeRotation);
 
         // Append some data and wait for the rotation time to elapse plus a 
small buffer.
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
         Thread.sleep((long)(TEST_ROTATION_TIME * 1.25));
 
         // Get the new writer after the rotation.
-        LogFileWriter writerAfterRotation = logWriter.getWriter();
+        LogFileWriter writerAfterRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("New writer should not be null", writerAfterRotation);
         assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
@@ -582,8 +572,10 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
         long commitId = 1L;
 
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
         // Get the initial writer
-        LogFileWriter initialWriter = logWriter.getWriter();
+        LogFileWriter initialWriter = logGroupWriter.getWriter();
         assertNotNull("Initial writer should not be null", initialWriter);
 
         // Configure the log writer to fail only the first time when creating 
new writers.
@@ -593,27 +585,27 @@ public class ReplicationLogTest {
                 throw new IOException("Simulated failure to create new 
writer");
             }
             return invocation.callRealMethod();
-        }).when(logWriter).createNewWriter(any(FileSystem.class), 
any(URI.class));
+        }).when(logGroupWriter).createNewWriter();
 
         // Append some data
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
 
         // Rotate the log.
-        LogFileWriter writerAfterFailedRotate = 
logWriter.rotateLog(RotationReason.TIME);
+        LogFileWriter writerAfterFailedRotate = 
logGroupWriter.rotateLog(RotationReason.TIME);
         assertEquals("Should still be using the initial writer", initialWriter,
             writerAfterFailedRotate);
 
         // While rotation is failing, verify we can continue to use the 
current writer.
-        logWriter.append(tableName, commitId + 1, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId + 1, put);
+        logGroup.sync();
 
-        LogFileWriter writerAfterRotate = 
logWriter.rotateLog(RotationReason.TIME);
+        LogFileWriter writerAfterRotate = 
logGroupWriter.rotateLog(RotationReason.TIME);
         assertNotEquals("Should be using a new writer", initialWriter, 
writerAfterRotate);
 
         // Try to append more data. This should work with the new writer after 
successful rotation.
-        logWriter.append(tableName, commitId + 2, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId + 2, put);
+        logGroup.sync();
 
         // Verify operations went to the writers in the correct order
         InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
@@ -638,23 +630,26 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
         long commitId = 1L;
 
-        LogFileWriter initialWriter = logWriter.getWriter();
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
+        // Get the initial writer
+        LogFileWriter initialWriter = logGroupWriter.getWriter();
         assertNotNull("Initial writer should not be null", initialWriter);
 
         // Configure the log writer to always fail when creating new writers
         doThrow(new IOException("Simulated failure to create new writer"))
-            .when(logWriter).createNewWriter(any(FileSystem.class), 
any(URI.class));
+            .when(logGroupWriter).createNewWriter();
 
         // Append some data
-        logWriter.append(tableName, commitId, put);
-        logWriter.sync();
+        logGroup.append(tableName, commitId, put);
+        logGroup.sync();
 
         // Try to rotate the log multiple times until we exceed the retry limit
-        for (int i = 0; i <= 
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
+        for (int i = 0; i <= 
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
             try {
-                logWriter.rotateLog(RotationReason.TIME);
+                logGroupWriter.rotateLog(RotationReason.TIME);
             } catch (IOException e) {
-                if (i < 
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
+                if (i < 
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
                     // Not the last attempt yet, continue
                     continue;
                 }
@@ -668,8 +663,8 @@ public class ReplicationLogTest {
 
         // Verify subsequent operations fail because the log is closed
         try {
-            logWriter.append(tableName, commitId + 1, put);
-            logWriter.sync();
+            logGroup.append(tableName, commitId + 1, put);
+            logGroup.sync();
             fail("Expected append to fail because log is closed");
         } catch (IOException e) {
             assertTrue("Expected an IOException because log is closed",
@@ -688,7 +683,7 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Writer should not be null", innerWriter);
 
         // Configure writer to throw a RuntimeException on append
@@ -696,9 +691,9 @@ public class ReplicationLogTest {
             .when(innerWriter).append(anyString(), anyLong(), 
any(Mutation.class));
 
         // Append data. This should trigger the LogExceptionHandler, which 
will close logWriter.
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Should have thrown IOException because sync timed out");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -706,7 +701,7 @@ public class ReplicationLogTest {
 
         // Verify that subsequent operations fail because the log is closed
         try {
-            logWriter.append(tableName, commitId + 1, put);
+            logGroup.append(tableName, commitId + 1, put);
             fail("Should have thrown IOException because log is closed");
         } catch (IOException e) {
           assertTrue("Expected an IOException because log is closed",
@@ -727,8 +722,10 @@ public class ReplicationLogTest {
         final long commitId = 1L;
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
         // Get the initial writer
-        LogFileWriter initialWriter = logWriter.getWriter();
+        LogFileWriter initialWriter = logGroupWriter.getWriter();
         assertNotNull("Initial writer should not be null", initialWriter);
 
         // Configure initial writer to fail on sync
@@ -736,22 +733,21 @@ public class ReplicationLogTest {
             .when(initialWriter).sync();
 
         // createNewWriter should keep returning the bad writer
-        doAnswer(invocation -> initialWriter).when(logWriter)
-            .createNewWriter(any(FileSystem.class), any(URI.class));
+        doAnswer(invocation -> 
initialWriter).when(logGroupWriter).createNewWriter();
 
         // Append data
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
 
         // Try to sync. Should fail after exhausting retries.
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Expected sync to fail after exhausting retries");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
         }
 
         // Each retry creates a new writer, so that is at least 1 create + 5 
retries.
-        verify(logWriter, atLeast(6)).createNewWriter(any(FileSystem.class), 
any(URI.class));
+        verify(logGroupWriter, atLeast(6)).createNewWriter();
     }
 
     /**
@@ -765,24 +761,24 @@ public class ReplicationLogTest {
         long commitId = 1L;
 
         // Get the initial writer
-        LogFileWriter writerBeforeRotation = logWriter.getWriter();
+        LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("Initial writer should not be null", 
writerBeforeRotation);
 
         // Append several items to fill currentBatch but don't sync yet
         for (int i = 0; i < 5; i++) {
-            logWriter.append(tableName, commitId + i, put);
+            logGroup.append(tableName, commitId + i, put);
         }
 
         // Force a rotation by waiting for rotation time to elapse
         Thread.sleep((long)(TEST_ROTATION_TIME * 1.25));
 
         // Get the new writer after rotation
-        LogFileWriter writerAfterRotation = logWriter.getWriter();
+        LogFileWriter writerAfterRotation = 
logGroup.getActiveWriter().getWriter();
         assertNotNull("New writer should not be null", writerAfterRotation);
         assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
         // Now trigger a sync which should replay the currentBatch to the new 
writer
-        logWriter.sync();
+        logGroup.sync();
 
         // Verify the sequence of operations
         InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
@@ -816,19 +812,21 @@ public class ReplicationLogTest {
         final int NUM_RECORDS = 100;
         List<LogFile.Record> originalRecords = new ArrayList<>();
 
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
         // Get the path of the log file.
-        Path logPath = logWriter.getWriter().getContext().getFilePath();
+        Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
 
         for (int i = 0; i < NUM_RECORDS; i++) {
             LogFile.Record record = LogFileTestUtil.newPutRecord(tableName, i, 
"row" + i, i, 1);
             originalRecords.add(record);
-            logWriter.append(record.getHBaseTableName(), record.getCommitId(),
+            logGroup.append(record.getHBaseTableName(), record.getCommitId(),
                 record.getMutation());
         }
-        logWriter.sync(); // Sync to commit the appends to the current writer.
+        logGroup.sync(); // Sync to commit the appends to the current writer.
 
         // Force a rotation to close the current writer.
-        logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+        logGroupWriter.rotateLog(RotationReason.SIZE);
 
         assertTrue("Log file should exist", localFs.exists(logPath));
 
@@ -870,10 +868,12 @@ public class ReplicationLogTest {
         List<LogFile.Record> originalRecords = new ArrayList<>();
         List<Path> logPaths = new ArrayList<>();
 
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
         // Write records across multiple rotations.
         for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
             // Get the path of the current log file.
-            Path logPath = logWriter.getWriter().getContext().getFilePath();
+            Path logPath = 
logGroupWriter.getWriter().getContext().getFilePath();
             logPaths.add(logPath);
 
             for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -881,12 +881,12 @@ public class ReplicationLogTest {
                 LogFile.Record record = 
LogFileTestUtil.newPutRecord(tableName, commitId,
                     "row" + commitId, commitId, 1);
                 originalRecords.add(record);
-                logWriter.append(record.getHBaseTableName(), 
record.getCommitId(),
+                logGroup.append(record.getHBaseTableName(), 
record.getCommitId(),
                     record.getMutation());
             }
-            logWriter.sync(); // Sync to commit the appends to the current 
writer.
+            logGroup.sync(); // Sync to commit the appends to the current 
writer.
             // Force a rotation to close the current writer.
-            logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+            logGroupWriter.rotateLog(RotationReason.SIZE);
         }
 
         // Verify all log files exist
@@ -937,10 +937,12 @@ public class ReplicationLogTest {
         List<LogFile.Record> originalRecords = new ArrayList<>();
         List<Path> logPaths = new ArrayList<>();
 
+        ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
         // Write records across multiple rotations, only syncing 50% of the 
time.
         for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
             // Get the path of the current log file.
-            Path logPath = logWriter.getWriter().getContext().getFilePath();
+            Path logPath = 
logGroupWriter.getWriter().getContext().getFilePath();
             logPaths.add(logPath);
 
             for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -948,17 +950,17 @@ public class ReplicationLogTest {
                 LogFile.Record record = 
LogFileTestUtil.newPutRecord(tableName, commitId,
                     "row" + commitId, commitId, 1);
                 originalRecords.add(record);
-                logWriter.append(record.getHBaseTableName(), 
record.getCommitId(),
+                logGroup.append(record.getHBaseTableName(), 
record.getCommitId(),
                     record.getMutation());
             }
 
             // Only sync 50% of the time before rotation. To ensure we sync on 
the last file
             // we are going to write, use 'rotation % 2 == 1' instead of 
'rotation % 2 == 0'.
             if (rotation % 2 == 1) {
-                logWriter.sync(); // Sync to commit the appends to the current 
writer.
+                logGroup.sync(); // Sync to commit the appends to the current 
writer.
             }
             // Force a rotation to close the current writer.
-            logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+            logGroupWriter.rotateLog(RotationReason.SIZE);
         }
 
         // Verify all log files exist
@@ -1004,18 +1006,18 @@ public class ReplicationLogTest {
         final long commitId = 1L;
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
-        // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
-        assertNotNull("Writer should not be null", innerWriter);
+        // Get the initial writer
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
+        assertNotNull("Inner writer should not be null", innerWriter);
 
         // Configure writer to throw RuntimeException on getLength()
         doThrow(new RuntimeException("Simulated critical error"))
             .when(innerWriter).getLength();
 
         // Append data. This should trigger the LogExceptionHandler, which 
will close logWriter.
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Should have thrown IOException because sync timed out");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1023,7 +1025,7 @@ public class ReplicationLogTest {
 
         // Verify that subsequent operations fail because the log is closed
         try {
-            logWriter.append(tableName, commitId + 1, put);
+            logGroup.append(tableName, commitId + 1, put);
             fail("Should have thrown IOException because log is closed");
         } catch (IOException e) {
             assertTrue("Expected an IOException because log is closed",
@@ -1045,7 +1047,7 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Writer should not be null", innerWriter);
 
         // Configure writer to throw RuntimeException on append
@@ -1053,9 +1055,9 @@ public class ReplicationLogTest {
             .when(innerWriter).append(anyString(), anyLong(), 
any(Mutation.class));
 
         // Append data to trigger closeOnError()
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Should have thrown IOException because sync timed out");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1063,7 +1065,7 @@ public class ReplicationLogTest {
 
         // Verify that subsequent append operations fail because the log is 
closed
         try {
-            logWriter.append(tableName, commitId, put);
+            logGroup.append(tableName, commitId, put);
             fail("Should have thrown IOException because log is closed");
         } catch (IOException e) {
             assertTrue("Expected an IOException because log is closed",
@@ -1085,7 +1087,7 @@ public class ReplicationLogTest {
         final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
         // Get the inner writer
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Writer should not be null", innerWriter);
 
         // Configure writer to throw RuntimeException on append
@@ -1093,9 +1095,9 @@ public class ReplicationLogTest {
             .when(innerWriter).append(anyString(), anyLong(), 
any(Mutation.class));
 
         // Append data to trigger closeOnError()
-        logWriter.append(tableName, commitId, put);
+        logGroup.append(tableName, commitId, put);
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Should have thrown IOException because sync timed out");
         } catch (IOException e) {
             assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1103,7 +1105,7 @@ public class ReplicationLogTest {
 
         // Verify that subsequent sync operations fail because the log is 
closed
         try {
-            logWriter.sync();
+            logGroup.sync();
             fail("Should have thrown IOException because log is closed");
         } catch (IOException e) {
             assertTrue("Expected an IOException because log is closed",
@@ -1114,176 +1116,6 @@ public class ReplicationLogTest {
         verify(innerWriter, times(1)).close();
     }
 
-    /**
-     * Tests race condition between LogRotationTask and LogEventHandler when 
both try to rotate
-     * the writer simultaneously. Verifies that despite concurrent rotation 
attempts, the log is
-     * only rotated once. Uses latches to ensure true concurrency and verify 
the sequence of
-     * operations.
-     */
-    @Test
-    public void testConcurrentRotationAttempts() throws Exception {
-        final String tableName = "TBLCR";
-        final long commitId = 1L;
-        final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-        // Get the initial writer
-        LogFileWriter initialWriter = logWriter.getWriter();
-        assertNotNull("Initial writer should not be null", initialWriter);
-
-        // Create latches to control timing and track rotation attempts
-        final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
-        final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
-        final CountDownLatch eventHandlerStarted = new CountDownLatch(1);
-        final CountDownLatch eventHandlerCanProceed = new CountDownLatch(1);
-        final AtomicInteger rotationCount = new AtomicInteger(0);
-        final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
-        // Configure the rotation task to pause at specific points and track 
attempts
-        doAnswer(invocation -> {
-            rotationTaskStarted.countDown(); // Signal that rotation task has 
started
-            bothAttemptsStarted.countDown(); // Signal that this attempt has 
started
-            rotationTaskCanProceed.await();  // Wait for permission to proceed
-            rotationCount.incrementAndGet(); // Track this rotation attempt
-            return invocation.callRealMethod();
-        }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
-        // Configure the event handler to pause at specific points
-        doAnswer(invocation -> {
-            eventHandlerStarted.countDown(); // Signal that event handler has 
started
-            bothAttemptsStarted.countDown(); // Signal that this attempt has 
started
-            eventHandlerCanProceed.await();  // Wait for permission to proceed
-            return invocation.callRealMethod();
-        }).when(logWriter).getWriter();
-
-        // Start a thread that will trigger rotation via the background task
-        Thread rotationThread = new Thread(() -> {
-            try {
-                // Force rotation by waiting for rotation time
-                Thread.sleep((long)(TEST_ROTATION_TIME * 1.25));
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        });
-        rotationThread.start();
-
-        // Start append operation in main thread
-        logWriter.append(tableName, commitId, put);
-
-        // Wait for both attempts to start - this ensures true concurrency
-        assertTrue("Both rotation attempts should start",
-            bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
-        // Verify both attempts have started before proceeding
-        assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
-
-        // Allow both operations to proceed simultaneously
-        eventHandlerCanProceed.countDown();
-        rotationTaskCanProceed.countDown();
-
-        // Wait for both operations to complete
-        rotationThread.join();
-
-        // Verify the final state
-        LogFileWriter finalWriter = logWriter.getWriter();
-        assertNotNull("Final writer should not be null", finalWriter);
-        assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
-
-        // Verify only one rotation actually occurred
-        assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
-
-        // Verify all operations completed successfully
-        logWriter.sync();
-
-        // Verify the sequence of operations through the latches
-        assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
-        assertTrue("Event handler should have started", 
eventHandlerStarted.getCount() == 0);
-    }
-
-    /**
-     * Tests race condition between LogEventHandler retry loop and 
LogRotationTask when both
-     * try to rotate the writer simultaneously. Verifies that despite 
concurrent rotation attempts
-     * during a retry scenario, the log is only rotated once. Uses latches to 
ensure true
-     * concurrency and verify the sequence of operations.
-     */
-    @Test
-    public void testConcurrentRotationDuringRetry() throws Exception {
-        final String tableName = "TBLCRR";
-        final long commitId = 1L;
-        final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-        // Get the initial writer
-        LogFileWriter initialWriter = logWriter.getWriter();
-        assertNotNull("Initial writer should not be null", initialWriter);
-
-        // Create latches to control timing and track rotation attempts
-        final CountDownLatch retryStarted = new CountDownLatch(1);
-        final CountDownLatch retryCanProceed = new CountDownLatch(1);
-        final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
-        final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
-        final AtomicInteger rotationCount = new AtomicInteger(0);
-        final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
-        // Configure the writer to fail on first append, succeed on retry
-        doAnswer(invocation -> {
-            retryStarted.countDown(); // Signal that retry has started
-            bothAttemptsStarted.countDown(); // Signal that this attempt has 
started
-            retryCanProceed.await(); // Wait for permission to proceed
-            return invocation.callRealMethod();
-        }).when(initialWriter).append(anyString(), anyLong(), 
any(Mutation.class));
-
-        // Configure the rotation task to pause at specific points and track 
attempts
-        doAnswer(invocation -> {
-            rotationTaskStarted.countDown(); // Signal that rotation task has 
started
-            bothAttemptsStarted.countDown(); // Signal that this attempt has 
started
-            rotationTaskCanProceed.await(); // Wait for permission to proceed
-            rotationCount.incrementAndGet(); // Track this rotation attempt
-            return invocation.callRealMethod();
-        }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
-        // Start a thread that will trigger rotation via the background task
-        Thread rotationThread = new Thread(() -> {
-            try {
-                // Force rotation by waiting for rotation time
-                Thread.sleep((long)(TEST_ROTATION_TIME * 1.25));
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        });
-        rotationThread.start();
-
-        // Start append operation in main thread
-        logWriter.append(tableName, commitId, put);
-
-        // Wait for both attempts to start - this ensures true concurrency
-        assertTrue("Both rotation attempts should start",
-            bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
-        // Verify both attempts have started before proceeding
-        assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
-
-        // Allow both operations to proceed simultaneously
-        retryCanProceed.countDown();
-        rotationTaskCanProceed.countDown();
-
-        // Wait for both operations to complete
-        rotationThread.join();
-
-        // Verify the final state
-        LogFileWriter finalWriter = logWriter.getWriter();
-        assertNotNull("Final writer should not be null", finalWriter);
-        assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
-
-        // Verify only one rotation actually occurred
-        assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
-
-        // Verify all operations completed successfully
-        logWriter.sync();
-
-        // Verify the sequence of operations through the latches
-        assertTrue("Retry should have started", retryStarted.getCount() == 0);
-        assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
-    }
-
     /**
      * Tests that multiple sync requests are consolidated into a single sync 
operation on the inner
      * writer when they occur in quick succession. Verifies that the Disruptor 
batching and
@@ -1300,7 +1132,7 @@ public class ReplicationLogTest {
         final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
         final long commitId3 = 3L;
 
-        LogFileWriter innerWriter = logWriter.getWriter();
+        LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
         assertNotNull("Inner writer should not be null", innerWriter);
 
         // Configure writer to briefly hold up the LogEventHandler upon first 
append.
@@ -1315,12 +1147,12 @@ public class ReplicationLogTest {
         // Post appends and three syncs in quick succession. The first append 
will be delayed long
         // enough for the three syncs to appear in a single Disruptor batch. 
Then they should all
         // be consolidated into a single sync.
-        logWriter.append(tableName, commitId1, put1);
-        logWriter.sync();
-        logWriter.append(tableName, commitId2, put2);
-        logWriter.sync();
-        logWriter.append(tableName, commitId3, put3);
-        logWriter.sync();
+        logGroup.append(tableName, commitId1, put1);
+        logGroup.sync();
+        logGroup.append(tableName, commitId2, put2);
+        logGroup.sync();
+        logGroup.append(tableName, commitId3, put3);
+        logGroup.sync();
 
         // Verify the sequence of operations on the inner writer: the three 
appends, then exactly
         // one sync.
@@ -1331,22 +1163,102 @@ public class ReplicationLogTest {
         inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should 
be called
     }
 
-    static class TestableReplicationLog extends ReplicationLog {
+    /**
+     * Tests that ReplicationLogGroup.get() returns the same instance for the 
same haGroupId.
+     * Verifies that multiple calls with the same parameters return the cached 
instance.
+     */
+    @Test
+    public void testReplicationLogGroupCaching() throws Exception {
+        final String haGroupId1 = "testHAGroup1";
+        final String haGroupId2 = "testHAGroup2";
+
+        // Get instances for the first HA group
+        ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+        ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+
+        // Verify same instance is returned for same haGroupId
+        assertNotNull("ReplicationLogGroup should not be null", g1_1);
+        assertNotNull("ReplicationLogGroup should not be null", g1_2);
+        assertTrue("Same instance should be returned for same haGroupId", g1_2 
== g1_1);
+        assertEquals("HA Group name should match", haGroupId1, 
g1_1.getHaGroupName());
+
+        // Get instance for a different HA group
+        ReplicationLogGroup g2_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId2);
+        assertNotNull("ReplicationLogGroup should not be null", g2_1);
+        assertTrue("Different instance should be returned for different 
haGroupId", g2_1 != g1_1);
+        assertEquals("HA Group name should match", haGroupId2, 
g2_1.getHaGroupName());
+
+        // Verify multiple calls still return cached instances
+        ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+        ReplicationLogGroup g2_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId2);
+        assertTrue("Cached instance should be returned", g1_3 == g1_1);
+        assertTrue("Cached instance should be returned", g2_2 == g2_1);
+
+        // Clean up
+        g1_1.close();
+        g2_1.close();
+    }
+
+    /**
+     * Tests that close() removes the instance from the cache.
+     * Verifies that after closing, a new call to get() creates a new instance.
+     */
+    @Test
+    public void testReplicationLogGroupCacheRemovalOnClose() throws Exception {
+        final String haGroupId = "testHAGroupCacheRemoval";
+
+        // Get initial instance
+        ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+        assertNotNull("ReplicationLogGroup should not be null", g1_1);
+        assertFalse("Group should not be closed initially", g1_1.isClosed());
+
+        // Verify cached instance is returned
+        ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+        assertTrue("Same instance should be returned before close", g1_2 == 
g1_1);
+
+        // Close the group
+        g1_1.close();
+        assertTrue("Group should be closed", g1_1.isClosed());
+
+        // Get instance after close - should be a new instance
+        ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+        assertNotNull("ReplicationLogGroup should not be null after close", 
g1_3);
+        assertFalse("New group should not be closed", g1_3.isClosed());
+        assertTrue("New instance should be created after close", g1_1 != g1_3);
+        assertEquals("HA Group name should match", haGroupId, 
g1_3.getHaGroupName());
+
+        // Clean up
+        g1_3.close();
+    }
+
+    static class TestableLogGroup extends ReplicationLogGroup {
 
-        protected TestableReplicationLog(Configuration conf, ServerName 
serverName) {
-            super(conf, serverName);
+        public TestableLogGroup(Configuration conf, ServerName serverName, 
String haGroupName) {
+            super(conf, serverName, haGroupName);
         }
 
         @Override
-        protected LogFileWriter createNewWriter(FileSystem fs,
-                URI url) throws IOException {
-            return spy(super.createNewWriter(fs, url));
+        protected ReplicationLogGroupWriter createRemoteWriter() throws 
IOException {
+            ReplicationLogGroupWriter writer = spy(new 
TestableStandbyLogGroupWriter(this));
+            writer.init();
+            return writer;
+        }
+
+    }
+
+    /**
+     * Testable version of StandbyLogGroupWriter that allows spying on writers.
+     */
+    static class TestableStandbyLogGroupWriter extends StandbyLogGroupWriter {
+
+        protected TestableStandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+            super(logGroup);
         }
 
         @Override
-        protected MetricsReplicationLogSource createMetricsSource() {
-            return new MetricsReplicationLogSourceImpl();
+        protected LogFileWriter createNewWriter() throws IOException {
+            LogFileWriter writer = super.createNewWriter();
+            return spy(writer);
         }
     }
-
 }

Reply via email to