kadirozde commented on code in PR #2315:
URL: https://github.com/apache/phoenix/pull/2315#discussion_r2512363673


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -378,38 +719,408 @@ protected MetricsReplicationLogGroupSource 
createMetricsSource() {
         return new MetricsReplicationLogGroupSourceImpl(haGroupName);
     }
 
-    /** Close the given writer. */
-    protected void closeWriter(ReplicationLogGroupWriter writer) {
-        if (writer != null) {
-            writer.close();
+    /**
+     * Get the name for this HA Group.
+     *
+     * @return The name for this HA Group
+     */
+    public String getHAGroupName() {
+        return haGroupName;
+    }
+
+    protected HAGroupStoreManager getHAGroupStoreManager() {
+        return haGroupStoreManager;
+    }
+
+    protected Configuration getConfiguration() {
+        return conf;
+    }
+
+    protected ServerName getServerName() {
+        return serverName;
+    }
+
+    @Override
+    public String toString() {
+        return getHAGroupName();
+    }
+
+    /**
+     * Creates the top level directory on the cluster determined by the URI
+     *
+     * @param urlKey Config property for the URI
+     * @param logDirName Top level directory underneath which the shards will 
be created
+     *
+     * @return ReplicationShardDirectoryManager
+     * @throws IOException
+     */
+    private ReplicationShardDirectoryManager createShardManager(
+            String urlKey, String logDirName) throws IOException {
+        URI rootURI = getLogURI(urlKey);
+        FileSystem fs = getFileSystem(rootURI);
+        LOG.info("HAGroup {} initialized filesystem at {}", this, rootURI);
+        // root dir path is <URI>/<HAGroupName>/[in|out]
+        Path rootDirPath = new Path(new Path(rootURI.getPath(), 
getHAGroupName()), logDirName);
+        if (!fs.exists(rootDirPath)) {
+            LOG.info("HAGroup {} creating root directory at {}", this, 
rootDirPath);
+            if (!fs.mkdirs(rootDirPath)) {
+                throw new IOException("Failed to create directory: " + 
rootDirPath);
+            }
+        }
+        return new ReplicationShardDirectoryManager(conf, fs, rootDirPath);
+    }
+
+    /** create shard manager for the standby cluster */
+    protected ReplicationShardDirectoryManager createStandbyShardManager() 
throws IOException {
+        return createShardManager(REPLICATION_STANDBY_HDFS_URL_KEY, 
STANDBY_DIR);
+    }
+
+    /** create shard manager for the fallback cluster */
+    protected ReplicationShardDirectoryManager createFallbackShardManager() 
throws IOException {
+        return createShardManager(REPLICATION_FALLBACK_HDFS_URL_KEY, 
FALLBACK_DIR);
+    }
+
+    /** return shard manager for the standby cluster */
+    protected ReplicationShardDirectoryManager getStandbyShardManager() {
+        return standbyShardManager;
+    }
+
+    /** return shard manager for the fallback cluster */
+    protected ReplicationShardDirectoryManager getFallbackShardManager() {
+        return fallbackShardManager;
+    }
+
+    private URI getLogURI(String urlKey) throws IOException {
+        String urlString = conf.get(urlKey);
+        if (urlString == null || urlString.trim().isEmpty()) {
+            throw new IOException("HDFS URL not configured: " + urlKey);
+        }
+        try {
+            return new URI(urlString);
+        } catch (URISyntaxException e) {
+            throw new IOException("Invalid HDFS URL: " + urlString, e);
         }
     }
 
-    /** Create the remote (synchronous) writer. Mainly for tests. */
-    protected ReplicationLogGroupWriter createRemoteWriter() throws 
IOException {
-        ReplicationLogGroupWriter writer = new StandbyLogGroupWriter(this);
-        writer.init();
-        return writer;
+    private FileSystem getFileSystem(URI uri) throws IOException {
+        return FileSystem.get(uri, conf);
+    }
+
+    /** Create the standby(synchronous) writer */
+    protected ReplicationLog createStandbyLog() throws IOException {
+        return new ReplicationLog(this, standbyShardManager);
+    }
+
+    /** Create the fallback writer */
+    protected ReplicationLog createFallbackLog() throws IOException {
+        return new ReplicationLog(this, fallbackShardManager);
     }
 
-    /** Create the local (store and forward) writer. Mainly for tests. */
-    protected ReplicationLogGroupWriter createLocalWriter() throws IOException 
{
-        ReplicationLogGroupWriter writer = new 
StoreAndForwardLogGroupWriter(this);
-        writer.init();
-        return writer;
+    /** Returns the log forwarder for this replication group */
+    protected ReplicationLogDiscoveryForwarder getLogForwarder() {
+        return logForwarder;
     }
 
     /** Returns the currently active writer. Mainly for tests. */
-    protected ReplicationLogGroupWriter getActiveWriter() {
-        switch (mode) {
-        case SYNC:
-            return remoteWriter;
-        case SYNC_AND_FORWARD:
-            return remoteWriter;
-        case STORE_AND_FORWARD:
-            return localWriter;
-        default:
-            throw new IllegalStateException("Invalid replication mode: " + 
mode);
+    protected ReplicationLog getActiveLog() {
+        return eventHandler.getCurrentModeImpl().getReplicationLog();
+    }
+
+    protected void setHAGroupStatusToStoreAndForward() throws Exception {
+        try {
+            haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+        }
+        catch (Exception ex) {
+            LOG.info("HAGroup {} failed to set status to STORE_AND_FORWARD", 
this, ex);
+            throw ex;
+        }
+    }
+
+    protected void setHAGroupStatusToSync() throws IOException {
+        try {
+            haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
+        } catch (IOException ex) {
+            // TODO logging
+            throw ex;
+        }
+        catch (Exception ex) {
+            // TODO logging
+            throw new IOException(ex);
+        }
+    }
+
+    /**
+     * Abort when we hit a fatal error
+     *
+     * @param reason
+     * @param cause
+     */
+    protected void abort(String reason, Throwable cause) {
+        // TODO better to use abort using RegionServerServices
+        String msg = "***** ABORTING region server: " + reason + " *****";
+        if (cause != null) {
+            msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
+        }
+        LOG.error(msg);
+        if (cause != null) {
+            throw new RuntimeException(msg, cause);
+        } else {
+            throw new RuntimeException(msg);
+        }
+    }
+
+    /**
+     * Handles events from the Disruptor,
+     */
+    protected class LogEventHandler implements EventHandler<LogEvent>, 
LifecycleAware {
+        private final List<CompletableFuture<Void>> pendingSyncFutures = new 
ArrayList<>();
+        // Current replication mode implementation which will handle the events
+        private ReplicationModeImpl currentModeImpl;
+
+        public LogEventHandler() {
+        }
+
+        public void init() throws IOException {
+            initializeMode(getMode());
+        }
+
+        @VisibleForTesting
+        public ReplicationModeImpl getCurrentModeImpl() {
+            return currentModeImpl;
+        }
+
+        private void initializeMode(ReplicationMode newMode) throws 
IOException {
+            try {
+                currentModeImpl = 
newMode.createModeImpl(ReplicationLogGroup.this);
+                currentModeImpl.onEnter();
+            } catch (IOException e) {
+                LOG.error("HAGroup {} couldn't initialize mode {}",
+                        ReplicationLogGroup.this, currentModeImpl, e);
+                updateModeOnFailure(e);
+            }
+        }
+
+        private void updateModeOnFailure(IOException e) throws IOException {
+            // send the failed event to the current mode
+            ReplicationMode newMode = currentModeImpl.onFailure(e);
+            setMode(newMode);
+            currentModeImpl.onExit(true);
+            initializeMode(newMode);
+        }
+
+        /**
+         * Processes all pending sync operations by syncing the current writer 
and completing
+         * their associated futures. This method is called when we are ready 
to process a set of
+         * consolidated sync requests and performs the following steps:
+         * <ol>
+         *   <li>Syncs the current writer to ensure all data is durably 
written.</li>
+         *   <li>Completes all pending sync futures successfully.</li>
+         *   <li>Clears the list of pending sync futures.</li>
+         *   <li>Clears the current batch of records since they have been 
successfully synced.</li>
+         * </ol>
+         * @param sequence The sequence number of the last processed event
+         * @throws IOException if the sync operation fails
+         */
+        private void processPendingSyncs(long sequence) throws IOException {
+            if (pendingSyncFutures.isEmpty()) {
+                return;
+            }
+            // call sync on the current mode
+            currentModeImpl.sync();
+            // Complete all pending sync futures
+            for (CompletableFuture<Void> future : pendingSyncFutures) {
+                future.complete(null);
+            }
+            pendingSyncFutures.clear();
+            LOG.info("Sync operation completed successfully up to sequence 
{}", sequence);
+        }
+
+        /**
+         * Fails all pending sync operations with the given exception. This 
method is called when
+         * we encounter an unrecoverable error during the sync of the inner 
writer. It completes
+         * all pending sync futures that were consolidated exceptionally.
+         * <p>
+         * Note: This method does not clear the currentBatch list. The 
currentBatch must be
+         * preserved as it contains records that may need to be replayed if we 
successfully
+         * rotate to a new writer.
+         *
+         * @param sequence The sequence number of the last processed event
+         * @param e The IOException that caused the failure
+         */
+        private void failPendingSyncs(long sequence, IOException e) {
+            if (pendingSyncFutures.isEmpty()) {
+                return;
+            }
+            for (CompletableFuture<Void> future : pendingSyncFutures) {
+                future.completeExceptionally(e);
+            }
+            pendingSyncFutures.clear();
+            LOG.warn("Failed to process syncs at sequence {}", sequence, e);
+        }
+
+        /**
+         * Handle the failure while processing an event
+         *
+         * @param failedEvent Event which triggered the failure
+         * @param sequence Sequence number of the failed event
+         * @param cause Reason of failure
+         */
+        private void onFailure(LogEvent failedEvent,
+                               long sequence,
+                               IOException cause) throws IOException {
+            // fetch the in-flight appends
+            List<Record> unsyncedAppends = 
currentModeImpl.log.getCurrentBatch();
+            // try updating the mode
+            updateModeOnFailure(cause);
+            // retry the batch after updating the mode
+            replayBatch(unsyncedAppends);
+            // retry the failed event
+            replayFailedEvent(failedEvent, sequence);
+        }
+
+        /** Replay all append events which were not yet synced */
+        private void replayBatch(List<Record> unsyncedAppends) throws 
IOException {
+            for (Record r : unsyncedAppends) {
+                currentModeImpl.append(r);
+            }
+        }
+
+        /** Retry the failed event after switching mode */
+        private void replayFailedEvent(LogEvent failedEvent,
+                                       long sequence) throws IOException {
+            // now retry the event which failed
+            // only need to retry append event since for sync event we have 
already added the
+            // sync event future to the pending future list before the sync 
event can potentially
+            // fail.
+            if (failedEvent.type == EVENT_TYPE_DATA) {
+                currentModeImpl.append(failedEvent.record);
+            }
+            processPendingSyncs(sequence);
+        }
+
+        /**
+         * Processes a single event from the Disruptor ring buffer. This 
method handles both data
+         * and sync events.
+         * <p>
+         * For data events, it:
+         * <ol>
+         *   <li>Sends the append event to the current mode.</li>
+         *   <li>Processes any pending syncs if this is the end of a 
batch.</li>
+         * </ol>
+         * <p>
+         * For sync events, it:
+         * <ol>
+         *   <li>Adds the sync future to the pending list.</li>
+         *   <li>Processes any pending syncs if this is the end of a 
batch.</li>
+         * </ol>
+         * <p>
+         * If an IOException occurs, it sends the failure event to the current 
replication mode
+         * and then switches to the new mode. After switching to the new mode, 
it replays the
+         * pending batch of un-synced appends and the failed event.
+         * </p>
+         *
+         * @param event The event to process
+         * @param sequence The sequence number of the event
+         * @param endOfBatch Whether this is the last event in the current 
batch
+         * @throws Exception if the operation fails after all retries
+         */
+        @Override
+        public void onEvent(LogEvent event, long sequence, boolean endOfBatch) 
throws Exception {
+            // Calculate time spent in ring buffer
+            long currentTimeNs = System.nanoTime();
+            long ringBufferTimeNs = currentTimeNs - event.timestampNs;
+            metrics.updateRingBufferTime(ringBufferTimeNs);
+            try {
+                switch (event.type) {
+                    case EVENT_TYPE_DATA:
+                        currentModeImpl.append(event.record);
+                        // Process any pending syncs at the end of batch.
+                        if (endOfBatch) {
+                            processPendingSyncs(sequence);
+                        }
+                        return;
+                    case EVENT_TYPE_SYNC:
+                        // Add this sync future to the pending list
+                        // OK, to add the same future multiple times when we 
rewind the batch
+                        // as completing an already completed future is a no-op
+                        pendingSyncFutures.add(event.syncFuture);
+                        // Process any pending syncs at the end of batch.
+                        if (endOfBatch) {
+                            processPendingSyncs(sequence);
+                        }
+                        // after a successful sync check the mode set on the 
replication group
+                        // Doing the mode check on sync points makes the 
implementation more robust
+                        // since we can guarantee that all unsynced appends 
have been flushed to the
+                        // replication log before we switch the replication 
mode
+                        ReplicationMode newMode = getMode();
+                        if (newMode != currentModeImpl.getMode()) {
+                            // some other thread switched the mode on the 
replication group
+                            LOG.info("Mode switched at sequence {} from {} to 
{}",
+                                    sequence, currentModeImpl, newMode);
+                            // call exit on the last mode here since we can 
guarantee that the lastMode
+                            // is not processing any event like append/sync 
because this is the only thread
+                            // that is consuming the events from the ring 
buffer and handing them off to the
+                            // mode
+                            currentModeImpl.onExit(true);

Review Comment:
   onExit should be done asynchronously so that it does not add additional time 
in the failure handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to