tkhurana commented on code in PR #2144: URL: https://github.com/apache/phoenix/pull/2144#discussion_r2105515603
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java: ########## @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +/** + * Manages the lifecycle of replication log files on the active cluster side. It handles log + * rotation based on time and size thresholds and provides the currently active LogFileWriter. + * This class is intended to be thread-safe. + * <p> + * Architecture Overview: + * <pre> + * ┌──────────────────────────────────────────────────────────────────────┐ + * │ ReplicationLog │ + * │ │ + * │ ┌─────────────┐ ┌────────────────────────────────────────────┐ │ + * │ │ │ │ │ │ + * │ │ Producers │ │ Disruptor Ring Buffer │ │ + * │ │ (append/ │────▶│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ + * │ │ sync) │ │ │ Event 1 │ │ Event 2 │ │ Event 3 │ ... │ │ + * │ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ + * │ └─────────────┘ └────────────────────────────────────────────┘ │ + * │ │ │ + * │ │ │ + * │ ▼ │ + * │ ┌─────────────────────────────────────────────────────────────┐ │ + * │ │ │ │ + * │ │ LogEventHandler │ │ + * │ │ ┌──────────────────────────────────────────────────────┐ │ │ + * │ │ │ │ │ │ + * │ │ │ - Batch Management │ │ │ + * │ │ │ - Writer Rotation │ │ │ + * │ │ │ - Error Handling │ │ │ + * │ │ │ - Mode Transitions │ │ │ + * │ │ │ │ │ │ + * │ │ └──────────────────────────────────────────────────────┘ │ │ + * │ │ │ │ │ + * │ │ ▼ │ │ + * │ │ ┌──────────────────────────────────────────────────────┐ │ │ + * │ │ │ │ │ │ + * │ │ │ LogFileWriter │ │ │ + * │ │ │ - File Management │ │ │ + * │ │ │ - Compression │ │ │ + * │ │ │ - HDFS Operations │ │ │ + * │ │ │ │ │ │ + * │ │ └──────────────────────────────────────────────────────┘ │ │ + * │ └─────────────────────────────────────────────────────────────┘ │ + * └──────────────────────────────────────────────────────────────────────┘ + * </pre> + * <p> + * The Disruptor provides a high-performance ring buffer that decouples the API from the complexity + * of writer management. Producers (callers of append/sync) simply publish events to the ring + * buffer and generally return quickly, except for sync(), where the writer will suspend the caller + * until the sync operation is successful. The LogEventHandler processes these events, handling the + * complexity of batching mutations for efficiency, rotating writers based on time or size, error + * handling and retries, and mode transitions for store-and-forward. + */ [email protected](value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", + "MS_EXPOSE_REP" }, justification = "Intentional") +public class ReplicationLog { + + /** The path on the standby HDFS where log files should be written. (The "IN" directory.) */ + public static final String REPLICATION_STANDBY_HDFS_URL_KEY = + "phoenix.replication.log.standby.hdfs.url"; + /** + * The path on the active HDFS where log files should be written when we have fallen back to + * store and forward mode. (The "OUT" directory.) + */ + public static final String REPLICATION_FALLBACK_HDFS_URL_KEY = + "phoenix.replication.log.fallback.hdfs.url"; + /** + * The number of shards (subfolders) to maintain in the "IN" directory. + * <p> + * Shard directories have the format shard-NNNNN, e.g. shard-00001. The maximum value is + * 100000. + */ + public static final String REPLICATION_NUM_SHARDS_KEY = "phoenix.replication.log.shards"; + public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000; + public static final int MAX_REPLICATION_NUM_SHARDS = 100000; + /** Replication log rotation time trigger, default is 1 minute */ + public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY = + "phoenix.replication.log.rotation.time.ms"; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L; + /** Replication log rotation size trigger, default is 256 MB */ + public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY = + "phoenix.replication.log.rotation.size.bytes"; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 1024 * 1024L; + /** Replication log rotation size trigger percentage, default is 0.95 */ + public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY = + "phoenix.replication.log.rotation.size.percentage"; + public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95; + /** Replication log compression, default is "NONE" */ + public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY = + "phoenix.replication.log.compression"; + public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = "NONE"; + public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY = + "phoenix.replication.log.ringbuffer.size"; + public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32; // Too big? + public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY = + "phoenix.replication.log.sync.timeout.ms"; + public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30; + public static final String REPLICATION_LOG_SYNC_RETRIES_KEY = + "phoenix.replication.log.sync.retries"; + public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5; + + public static final String SHARD_DIR_FORMAT = "shard%05d"; + public static final String FILE_NAME_FORMAT = "%d-%s.plog"; + + static final byte EVENT_TYPE_DATA = 0; + static final byte EVENT_TYPE_SYNC = 1; + + static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class); + + protected static volatile ReplicationLog instance; + + protected final Configuration conf; + protected final ServerName serverName; + protected FileSystem standbyFs; + protected FileSystem fallbackFs; // For store-and-forward (future use) + protected int numShards; + protected URI standbyUrl; + protected URI fallbackUrl; // For store-and-forward (future use) + protected final long rotationTimeMs; + protected final long rotationSizeBytes; + protected final Compression.Algorithm compression; + protected final ReentrantLock lock = new ReentrantLock(); + protected volatile LogFileWriter currentWriter; // Current writer + protected final AtomicLong lastRotationTime = new AtomicLong(); + protected final AtomicLong writerGeneration = new AtomicLong(); + protected ScheduledExecutorService rotationExecutor; + protected final int ringBufferSize; + protected final long syncTimeoutMs; + protected Disruptor<LogEvent> disruptor; + protected RingBuffer<LogEvent> ringBuffer; + protected final ConcurrentHashMap<Path, Object> shardMap = new ConcurrentHashMap<>(); + protected final MetricsReplicationLogSource metrics; + protected volatile boolean isClosed = false; + + /** + * Tracks the current replication mode of the ReplicationLog. + * <p> + * The replication mode determines how mutations are handled: + * <ul> + * <li>SYNC: Normal operation where mutations are written directly to the standby cluster's + * HDFS. + * This is the default and primary mode of operation.</li> + * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is unavailable. + * Mutations are stored locally and will be forwarded when connectivity is restored.</li> + * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written directly to the + * standby cluster while concurrently draining the local queue of previously stored + * mutations.</li> + * </ul> + * <p> + * Mode transitions occur automatically based on the availability of the standby cluster's HDFS + * and the state of the local mutation queue. + */ + protected enum ReplicationMode { + /** + * Normal operation where mutations are written directly to the standby cluster's HDFS. + * This is the default and primary mode of operation. + */ + SYNC, + + /** + * Fallback mode when the standby cluster's HDFS is unavailable. Mutations are stored + * locally and will be forwarded when connectivity is restored. + */ + STORE_AND_FORWARD, + + /** + * Transitional mode where new mutations are written directly to the standby cluster + * while concurrently draining the local queue of previously stored mutations. This mode + * is entered when connectivity to the standby cluster is restored while there are still + * mutations in the local queue. + */ + SYNC_AND_FORWARD; + } + + /** The reason for requesting a log rotation. */ + protected enum RotationReason { + /** Rotation requested due to time threshold being exceeded. */ + TIME, + /** Rotation requested due to size threshold being exceeded. */ + SIZE, + /** Rotation requested due to an error condition. */ + ERROR; + } + + /** + * The current replication mode. Always SYNC for now. + * <p>TODO: Implement mode transitions to STORE_AND_FORWARD when standby becomes unavailable. + * <p>TODO: Implement mode transitions to SYNC_AND_FORWARD when draining queue. + */ + protected volatile ReplicationMode currentMode = ReplicationMode.SYNC; + + // TODO: Add configuration keys for store-and-forward behavior + // - Maximum retry attempts before switching to store-and-forward + // - Retry delay between attempts + // - Queue drain batch size + // - Queue drain interval + + // TODO: Add state tracking fields + // - Queue of pending changes when in store-and-forward mode + // - Timestamp of last successful standby write + // - Error count for tracking consecutive failures + + // TODO: Add methods for state transitions + // - switchToStoreAndForward() - Called when standby becomes unavailable + // - switchToSync() - Called when standby becomes available again + // - drainQueue() - Background task to process queued changes + + // TODO: Enhance error handling in LogEventHandler + // - Track consecutive failures + // - Switch to store-and-forward after max retries + // - Implement queue draining when in SYNC_AND_FORWARD state + + /** + * Gets the singleton instance of the ReplicationLogManager using the lazy initializer pattern. + * Initializes the instance if it hasn't been created yet. + * @param conf Configuration object. + * @param serverName The server name. + * @return The singleton ReplicationLogManager instance. + * @throws IOException If initialization fails. + */ + public static ReplicationLog get(Configuration conf, ServerName serverName) + throws IOException { + if (instance == null) { + synchronized (ReplicationLog.class) { + if (instance == null) { + // Complete initialization before assignment + ReplicationLog logManager = new ReplicationLog(conf, serverName); + logManager.init(); + instance = logManager; + } + } + } + return instance; + } + + protected ReplicationLog(Configuration conf, ServerName serverName) { + this.conf = conf; + this.serverName = serverName; + this.rotationTimeMs = conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY, + DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS); + long rotationSize = conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY, + DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES); + double rotationSizePercent = conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY, + DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE); + this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent); + this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, DEFAULT_REPLICATION_NUM_SHARDS); + String compressionName = conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY, + DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM); + Compression.Algorithm compression = Compression.Algorithm.NONE; + if (!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName)) { + try { + compression = Compression.getCompressionAlgorithmByName(compressionName); + } catch (IllegalArgumentException e) { + LOG.warn("Unknown compression type " + compressionName + ", using NONE", e); + } + } + this.compression = compression; + this.ringBufferSize = conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY, + DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE); + this.syncTimeoutMs = conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY, + DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT); + this.metrics = createMetricsSource(); + } + + protected MetricsReplicationLogSource createMetricsSource() { + return new MetricsReplicationLogSourceImpl(); + } + + public MetricsReplicationLogSource getMetrics() { + return metrics; + } + + @SuppressWarnings("unchecked") + public void init() throws IOException { + if (numShards > MAX_REPLICATION_NUM_SHARDS) { + throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " is " + numShards + + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS); + } + initializeFileSystems(); + // Start time based rotation. + lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis()); + startRotationExecutor(); + // Create the initial writer. Do this before we call LogEventHandler.init(). + currentWriter = createNewWriter(standbyFs, standbyUrl); + // Initialize the Disruptor. We use ProducerType.MULTI because multiple handlers might + // call append concurrently. We use YieldingWaitStrategy for low latency. When the ring + // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), producers + // calling ringBuffer.next() will effectively block (by yielding/spinning), creating + // backpressure on the callers. This ensures appends don't proceed until there is space. + disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize, + new ThreadFactoryBuilder().setNameFormat("ReplicationLogEventHandler-%d") + .setDaemon(true).build(), + ProducerType.MULTI, new YieldingWaitStrategy()); + LogEventHandler eventHandler = new LogEventHandler(); + eventHandler.init(); + disruptor.handleEventsWith(eventHandler); + LogExceptionHandler exceptionHandler = new LogExceptionHandler(); + disruptor.setDefaultExceptionHandler(exceptionHandler); + ringBuffer = disruptor.start(); + LOG.info("ReplicationLogWriter started with ring buffer size {}", ringBufferSize); + } + + /** + * Append a mutation to the log. This method is non-blocking and returns quickly, unless the + * ring buffer is full. The actual write happens asynchronously. We expect multiple append() + * calls followed by a sync(). The appends will be batched by the Disruptor. Should the ring + * buffer become full, which is not expected under normal operation but could (and should) + * happen if the log file writer is unable to make progress, due to a HDFS level disruption. + * Should we enter that condition this method will block until the append can be inserted. + * @param tableName The name of the HBase table the mutation applies to. + * @param commitId The commit identifier (e.g., SCN) associated with the mutation. + * @param mutation The HBase Mutation (Put or Delete) to be logged. + * @throws IOException If the writer is closed or if the ring buffer is full. + */ + public void append(String tableName, long commitId, Mutation mutation) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, commitId, mutation); + } + if (isClosed) { + throw new IOException("Closed"); + } + long startTime = System.nanoTime(); + // ringBuffer.next() claims the next sequence number. Because we initialize the Disruptor + // with ProducerType.MULTI and the blocking YieldingWaitStrategy this call WILL BLOCK if + // the ring buffer is full, thus providing backpressure to the callers. + long sequence = ringBuffer.next(); + try { + LogEvent event = ringBuffer.get(sequence); + event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, mutation), null, + sequence); + metrics.updateAppendTime(System.nanoTime() - startTime); + } finally { + // Update ring buffer events metric + ringBuffer.publish(sequence); + } + } + + /** + * Ensures all previously appended records are durably persisted. This method blocks until the + * sync operation completes or fails, potentially after internal retries. All in flight appends + * are batched and provided to the underlying LogWriter, which will then be synced. If there is + * a problem syncing the LogWriter we will retry, up to the retry limit, rolling the writer for + * each retry. + * <p> + * NOTE: When the ReplicationLogManager is capable of switching between synchronous and + * fallback (store-and-forward) writers, then this will be pretty bullet proof. Right now we + * will still try to roll the synchronous writer a few times before giving up. + * @throws IOException If the sync operation fails after retries, or if interrupted. + */ + public void sync() throws IOException { Review Comment: If I understand this correctly, we will publish a SYNC event for every sync call by the rpc handler and will do a filesystem sync for every SYNC event. Moreover, each SYNC will also create a new block -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
