apurtell commented on code in PR #2144:
URL: https://github.com/apache/phoenix/pull/2144#discussion_r2109552236


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java:
##########
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+/**
+ * Manages the lifecycle of replication log files on the active cluster side. 
It handles log
+ * rotation based on time and size thresholds and provides the currently 
active LogFileWriter.
+ * This class is intended to be thread-safe.
+ * <p>
+ * Architecture Overview:
+ * <pre>
+ * ┌──────────────────────────────────────────────────────────────────────┐
+ * │                           ReplicationLog                             │
+ * │                                                                      │
+ * │  ┌─────────────┐     ┌────────────────────────────────────────────┐  │
+ * │  │             │     │                                            │  │
+ * │  │  Producers  │     │  Disruptor Ring Buffer                     │  │
+ * │  │  (append/   │────▶│  ┌─────────┐ ┌─────────┐ ┌─────────┐       │  │
+ * │  │   sync)     │     │  │ Event 1 │ │ Event 2 │ │ Event 3 │ ...   │  │
+ * │  │             │     │  └─────────┘ └─────────┘ └─────────┘       │  │
+ * │  └─────────────┘     └────────────────────────────────────────────┘  │
+ * │                                │                                     │
+ * │                                │                                     │
+ * │                                ▼                                     │
+ * │  ┌─────────────────────────────────────────────────────────────┐     │
+ * │  │                                                             │     │
+ * │  │  LogEventHandler                                            │     │
+ * │  │  ┌──────────────────────────────────────────────────────┐   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  │  - Batch Management                                  │   │     │
+ * │  │  │  - Writer Rotation                                   │   │     │
+ * │  │  │  - Error Handling                                    │   │     │
+ * │  │  │  - Mode Transitions                                  │   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  └──────────────────────────────────────────────────────┘   │     │
+ * │  │                             │                               │     │
+ * │  │                             ▼                               │     │
+ * │  │  ┌──────────────────────────────────────────────────────┐   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  │  LogFileWriter                                       │   │     │
+ * │  │  │  - File Management                                   │   │     │
+ * │  │  │  - Compression                                       │   │     │
+ * │  │  │  - HDFS Operations                                   │   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  └──────────────────────────────────────────────────────┘   │     │
+ * │  └─────────────────────────────────────────────────────────────┘     │
+ * └──────────────────────────────────────────────────────────────────────┘
+ * </pre>
+ * <p>
+ * The Disruptor provides a high-performance ring buffer that decouples the 
API from the complexity
+ * of writer management. Producers (callers of append/sync) simply publish 
events to the ring
+ * buffer and generally return quickly, except for sync(), where the writer 
will suspend the caller
+ * until the sync operation is successful. The LogEventHandler processes these 
events, handling the
+ * complexity of batching mutations for efficiency, rotating writers based on 
time or size, error
+ * handling and retries, and mode transitions for store-and-forward.
+ */
[email protected](value = { "EI_EXPOSE_REP", 
"EI_EXPOSE_REP2",
+    "MS_EXPOSE_REP" }, justification = "Intentional")
+public class ReplicationLog {
+
+    /** The path on the standby HDFS where log files should be written. (The 
"IN" directory.) */
+    public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+        "phoenix.replication.log.standby.hdfs.url";
+    /**
+     * The path on the active HDFS where log files should be written when we 
have fallen back to
+     * store and forward mode. (The "OUT" directory.)
+     */
+    public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+        "phoenix.replication.log.fallback.hdfs.url";
+    /**
+     * The number of shards (subfolders) to maintain in the "IN" directory.
+     * <p>
+     * Shard directories have the format shard-NNNNN, e.g. shard-00001. The 
maximum value is
+     * 100000.
+     */
+    public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
+    public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+    public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+    /** Replication log rotation time trigger, default is 1 minute */
+    public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+        "phoenix.replication.log.rotation.time.ms";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
+    /** Replication log rotation size trigger, default is 256 MB */
+    public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+        "phoenix.replication.log.rotation.size.bytes";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 
* 1024 * 1024L;
+    /** Replication log rotation size trigger percentage, default is 0.95 */
+    public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+        "phoenix.replication.log.rotation.size.percentage";
+    public static final double 
DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95;
+    /** Replication log compression, default is "NONE" */
+    public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+        "phoenix.replication.log.compression";
+    public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
+    public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+        "phoenix.replication.log.ringbuffer.size";
+    public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 
32;  // Too big?
+    public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+        "phoenix.replication.log.sync.timeout.ms";
+    public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+    public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+        "phoenix.replication.log.sync.retries";
+    public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+
+    public static final String SHARD_DIR_FORMAT = "shard%05d";
+    public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+    static final byte EVENT_TYPE_DATA = 0;
+    static final byte EVENT_TYPE_SYNC = 1;
+
+    static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+
+    protected static volatile ReplicationLog instance;
+
+    protected final Configuration conf;
+    protected final ServerName serverName;
+    protected FileSystem standbyFs;
+    protected FileSystem fallbackFs; // For store-and-forward (future use)
+    protected int numShards;
+    protected URI standbyUrl;
+    protected URI fallbackUrl; // For store-and-forward (future use)
+    protected final long rotationTimeMs;
+    protected final long rotationSizeBytes;
+    protected final Compression.Algorithm compression;
+    protected final ReentrantLock lock = new ReentrantLock();
+    protected volatile LogFileWriter currentWriter; // Current writer
+    protected final AtomicLong lastRotationTime = new AtomicLong();
+    protected final AtomicLong writerGeneration = new AtomicLong();
+    protected ScheduledExecutorService rotationExecutor;
+    protected final int ringBufferSize;
+    protected final long syncTimeoutMs;
+    protected Disruptor<LogEvent> disruptor;
+    protected RingBuffer<LogEvent> ringBuffer;
+    protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
+    protected final MetricsReplicationLogSource metrics;
+    protected volatile boolean isClosed = false;
+
+    /**
+     * Tracks the current replication mode of the ReplicationLog.
+     * <p>
+     * The replication mode determines how mutations are handled:
+     * <ul>
+     *   <li>SYNC: Normal operation where mutations are written directly to 
the standby cluster's
+     *   HDFS.
+     *   This is the default and primary mode of operation.</li>
+     *   <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS 
is unavailable.
+     *   Mutations are stored locally and will be forwarded when connectivity 
is restored.</li>
+     *   <li>SYNC_AND_FORWARD: Transitional mode where new mutations are 
written directly to the
+     *   standby cluster while concurrently draining the local queue of 
previously stored
+     *   mutations.</li>
+     * </ul>
+     * <p>
+     * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
+     * and the state of the local mutation queue.
+     */
+    protected enum ReplicationMode {
+        /**
+         * Normal operation where mutations are written directly to the 
standby cluster's HDFS.
+         * This is the default and primary mode of operation.
+         */
+        SYNC,
+
+        /**
+         * Fallback mode when the standby cluster's HDFS is unavailable. 
Mutations are stored
+         * locally and will be forwarded when connectivity is restored.
+         */
+        STORE_AND_FORWARD,
+
+        /**
+         * Transitional mode where new mutations are written directly to the 
standby cluster
+         * while concurrently draining the local queue of previously stored 
mutations. This mode
+         * is entered when connectivity to the standby cluster is restored 
while there are still
+         * mutations in the local queue.
+         */
+        SYNC_AND_FORWARD;
+    }
+
+    /** The reason for requesting a log rotation. */
+    protected enum RotationReason {
+        /** Rotation requested due to time threshold being exceeded. */
+        TIME,
+        /** Rotation requested due to size threshold being exceeded. */
+        SIZE,
+        /** Rotation requested due to an error condition. */
+        ERROR;
+    }
+
+    /**
+     * The current replication mode. Always SYNC for now.
+     * <p>TODO: Implement mode transitions to STORE_AND_FORWARD when standby 
becomes unavailable.
+     * <p>TODO: Implement mode transitions to SYNC_AND_FORWARD when draining 
queue.
+     */
+    protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
+
+    // TODO: Add configuration keys for store-and-forward behavior
+    // - Maximum retry attempts before switching to store-and-forward
+    // - Retry delay between attempts
+    // - Queue drain batch size
+    // - Queue drain interval
+
+    // TODO: Add state tracking fields
+    // - Queue of pending changes when in store-and-forward mode
+    // - Timestamp of last successful standby write
+    // - Error count for tracking consecutive failures
+
+    // TODO: Add methods for state transitions
+    // - switchToStoreAndForward() - Called when standby becomes unavailable
+    // - switchToSync() - Called when standby becomes available again
+    // - drainQueue() - Background task to process queued changes
+
+    // TODO: Enhance error handling in LogEventHandler
+    // - Track consecutive failures
+    // - Switch to store-and-forward after max retries
+    // - Implement queue draining when in SYNC_AND_FORWARD state
+
+    /**
+     * Gets the singleton instance of the ReplicationLogManager using the lazy 
initializer pattern.
+     * Initializes the instance if it hasn't been created yet.
+     * @param conf Configuration object.
+     * @param serverName The server name.
+     * @return The singleton ReplicationLogManager instance.
+     * @throws IOException If initialization fails.
+     */
+    public static ReplicationLog get(Configuration conf, ServerName serverName)
+          throws IOException {
+        if (instance == null) {
+            synchronized (ReplicationLog.class) {
+                if (instance == null) {
+                    // Complete initialization before assignment
+                    ReplicationLog logManager = new ReplicationLog(conf, 
serverName);
+                    logManager.init();
+                    instance = logManager;
+                }
+            }
+        }
+        return instance;
+    }
+
+    protected ReplicationLog(Configuration conf, ServerName serverName) {
+        this.conf = conf;
+        this.serverName = serverName;
+        this.rotationTimeMs = 
conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY,
+            DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+        long rotationSize = 
conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+            DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+        double rotationSizePercent = 
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+            DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+        this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
+        this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, 
DEFAULT_REPLICATION_NUM_SHARDS);
+        String compressionName = 
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+            DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+        Compression.Algorithm compression = Compression.Algorithm.NONE;
+        if 
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
 {
+            try {
+                compression = 
Compression.getCompressionAlgorithmByName(compressionName);
+            } catch (IllegalArgumentException e) {
+                LOG.warn("Unknown compression type " + compressionName + ", 
using NONE", e);
+            }
+        }
+        this.compression = compression;
+        this.ringBufferSize = conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
+            DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+        this.syncTimeoutMs = conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY,
+            DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
+        this.metrics = createMetricsSource();
+    }
+
+    protected MetricsReplicationLogSource createMetricsSource() {
+        return new MetricsReplicationLogSourceImpl();
+    }
+
+    public MetricsReplicationLogSource getMetrics() {
+        return metrics;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init() throws IOException {
+        if (numShards > MAX_REPLICATION_NUM_SHARDS) {
+            throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " 
is " + numShards
+                + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
+        }
+        initializeFileSystems();
+        // Start time based rotation.
+        lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
+        startRotationExecutor();
+        // Create the initial writer. Do this before we call 
LogEventHandler.init().
+        currentWriter = createNewWriter(standbyFs, standbyUrl);
+        // Initialize the Disruptor. We use ProducerType.MULTI because 
multiple handlers might
+        // call append concurrently. We use YieldingWaitStrategy for low 
latency. When the ring
+        // buffer is full (controlled by 
REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), producers
+        // calling ringBuffer.next() will effectively block (by 
yielding/spinning), creating
+        // backpressure on the callers. This ensures appends don't proceed 
until there is space.
+        disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
+            new 
ThreadFactoryBuilder().setNameFormat("ReplicationLogEventHandler-%d")
+                .setDaemon(true).build(),
+            ProducerType.MULTI, new YieldingWaitStrategy());
+        LogEventHandler eventHandler = new LogEventHandler();
+        eventHandler.init();
+        disruptor.handleEventsWith(eventHandler);
+        LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+        disruptor.setDefaultExceptionHandler(exceptionHandler);
+        ringBuffer = disruptor.start();
+        LOG.info("ReplicationLogWriter started with ring buffer size {}", 
ringBufferSize);
+    }
+
+    /**
+     * Append a mutation to the log. This method is non-blocking and returns 
quickly, unless the
+     * ring buffer is full. The actual write happens asynchronously. We expect 
multiple append()
+     * calls followed by a sync(). The appends will be batched by the 
Disruptor. Should the ring
+     * buffer become full, which is not expected under normal operation but 
could (and should)
+     * happen if the log file writer is unable to make progress, due to a HDFS 
level disruption.
+     * Should we enter that condition this method will block until the append 
can be inserted.
+     * @param tableName The name of the HBase table the mutation applies to.
+     * @param commitId  The commit identifier (e.g., SCN) associated with the 
mutation.
+     * @param mutation  The HBase Mutation (Put or Delete) to be logged.
+     * @throws IOException If the writer is closed or if the ring buffer is 
full.
+     */
+    public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
+        }
+        if (isClosed) {
+            throw new IOException("Closed");
+        }
+        long startTime = System.nanoTime();
+        // ringBuffer.next() claims the next sequence number. Because we 
initialize the Disruptor
+        // with ProducerType.MULTI and the blocking YieldingWaitStrategy this 
call WILL BLOCK if
+        // the ring buffer is full, thus providing backpressure to the callers.
+        long sequence = ringBuffer.next();
+        try {
+            LogEvent event = ringBuffer.get(sequence);
+            event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, 
mutation), null,
+                sequence);
+            metrics.updateAppendTime(System.nanoTime() - startTime);
+        } finally {
+            // Update ring buffer events metric
+            ringBuffer.publish(sequence);
+        }
+    }
+
+    /**
+     * Ensures all previously appended records are durably persisted. This 
method blocks until the
+     * sync operation completes or fails, potentially after internal retries. 
All in flight appends
+     * are batched and provided to the underlying LogWriter, which will then 
be synced. If there is
+     * a problem syncing the LogWriter we will retry, up to the retry limit, 
rolling the writer for
+     * each retry.
+     * <p>
+     * NOTE: When the ReplicationLogManager is capable of switching between 
synchronous and
+     * fallback (store-and-forward) writers, then this will be pretty bullet 
proof. Right now we
+     * will still try to roll the synchronous writer a few times before giving 
up.
+     * @throws IOException If the sync operation fails after retries, or if 
interrupted.
+     */
+    public void sync() throws IOException {

Review Comment:
   Yes, unless there are no pending in-flight appends. I think it has to be 
this way. Any caller of sync expects when the sync returns that all of its 
in-flight appends have been persisted and also the sync has been propagated 
through to the underlying filesystem. 
   
   My focus now is on correctness and simplicity in implementation is part of 
that. If the callers have a requirement that sync calls are optimized, this 
concern for now is pushed to the callers.
   
   I do think we should do sync merging/consolidation as a follow up issue. The 
core idea is the LogEventHandler, when it processes a sync event, it should 
check if there are other subsequent sync events posted in the ring buffer (a 
small, bounded lookahead?) that can be satisfied by the same underlying 
writer.sync() operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to