apurtell commented on code in PR #2278: URL: https://github.com/apache/phoenix/pull/2278#discussion_r2411935603
########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java: ########## @@ -0,0 +1,1175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.jdbc.HAGroupStoreRecord; +import org.apache.phoenix.replication.ReplicationLogTracker; +import org.apache.phoenix.replication.ReplicationLogGroup; +import org.apache.phoenix.replication.ReplicationRound; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ReplicationLogDiscoveryReplayTest { + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private Configuration conf; + private FileSystem localFs; + private URI standbyUri; + private static final String haGroupName = "testGroup"; + private static final MetricsReplicationLogTracker METRICS_REPLICATION_LOG_TRACKER = new MetricsReplicationLogTrackerReplayImpl(haGroupName); + + @Before + public void setUp() throws IOException { + conf = HBaseConfiguration.create(); + localFs = FileSystem.getLocal(conf); + standbyUri = testFolder.getRoot().toURI(); + conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, standbyUri.toString()); + } + + @After + public void tearDown() throws IOException { + localFs.delete(new Path(testFolder.getRoot().toURI()), true); + } + + /** + * Tests that the executor thread name format is correctly configured. + */ + @Test + public void testGetExecutorThreadNameFormat() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test that it returns the expected constant value + String result = discovery.getExecutorThreadNameFormat(); + assertEquals("Should return the expected thread name format", + "Phoenix-ReplicationLogDiscoveryReplay-%d", result); + } + + /** + * Tests the replay interval configuration with default and custom values. + */ + @Test + public void testGetReplayIntervalSeconds() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test default value when no custom config is set + long defaultResult = discovery.getReplayIntervalSeconds(); + assertEquals("Should return default value when no custom config is set", + ReplicationLogDiscoveryReplay.DEFAULT_REPLAY_INTERVAL_SECONDS, defaultResult); + + // Test custom value when config is set + conf.setLong(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_INTERVAL_SECONDS_KEY, 120L); + long customResult = discovery.getReplayIntervalSeconds(); + assertEquals("Should return custom value when config is set", + 120L, customResult); + } + + /** + * Tests the shutdown timeout configuration with default and custom values. + */ + @Test + public void testGetShutdownTimeoutSeconds() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test default value when no custom config is set + long defaultResult = discovery.getShutdownTimeoutSeconds(); + assertEquals("Should return default value when no custom config is set", + ReplicationLogDiscoveryReplay.DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, defaultResult); + + // Test custom value when config is set + conf.setLong(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_SHUTDOWN_TIMEOUT_SECONDS_KEY, 45L); + long customResult = discovery.getShutdownTimeoutSeconds(); + assertEquals("Should return custom value when config is set", + 45L, customResult); + } + + /** + * Tests the executor thread count configuration with default and custom values. + */ + @Test + public void testGetExecutorThreadCount() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test default value when no custom config is set + int defaultResult = discovery.getExecutorThreadCount(); + assertEquals("Should return default value when no custom config is set", + ReplicationLogDiscoveryReplay.DEFAULT_EXECUTOR_THREAD_COUNT, defaultResult); + + // Test custom value when config is set + conf.setInt(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_EXECUTOR_THREAD_COUNT_KEY, 3); + int customResult = discovery.getExecutorThreadCount(); + assertEquals("Should return custom value when config is set", + 3, customResult); + } + + /** + * Tests the in-progress directory processing probability configuration. + */ + @Test + public void testGetInProgressDirectoryProcessProbability() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test default value when no custom config is set + double defaultResult = discovery.getInProgressDirectoryProcessProbability(); + assertEquals("Should return default value when no custom config is set", + ReplicationLogDiscoveryReplay.DEFAULT_IN_PROGRESS_DIRECTORY_PROCESSING_PROBABILITY, defaultResult, 0.001); + + // Test custom value when config is set + conf.setDouble(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_IN_PROGRESS_DIRECTORY_PROCESSING_PROBABILITY_KEY, 10.5); + double customResult = discovery.getInProgressDirectoryProcessProbability(); + assertEquals("Should return custom value when config is set", + 10.5, customResult, 0.001); + } + + /** + * Tests the waiting buffer percentage configuration with default and custom values. + */ + @Test + public void testGetWaitingBufferPercentage() throws IOException { + // Create ReplicationLogDiscoveryReplay instance + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + ReplicationLogDiscoveryReplay discovery = new ReplicationLogDiscoveryReplay(fileTracker); + + // Test default value when no custom config is set + double defaultResult = discovery.getWaitingBufferPercentage(); + assertEquals("Should return default value when no custom config is set", + ReplicationLogDiscoveryReplay.DEFAULT_WAITING_BUFFER_PERCENTAGE, defaultResult, 0.001); + + // Test custom value when config is set + conf.setDouble(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_WAITING_BUFFER_PERCENTAGE_KEY, 20.0); + double customResult = discovery.getWaitingBufferPercentage(); + assertEquals("Should return custom value when config is set", + 20.0, customResult, 0.001); + } + + /** + * Tests initialization in DEGRADED state with both in-progress and new files present. + * Validates that lastRoundProcessed uses minimum timestamp and lastRoundInSync is preserved. + */ + @Test + public void testInitializeLastRoundProcessed_DegradedStateWithInProgressAndNewFiles() throws IOException { + long currentTime = 1704153600000L; // 2024-01-02 00:00:00 + long inProgressFileTimestamp = 1704153420000L; // Earlier timestamp (00:57:00) - 3 min before current + long newFileTimestamp = 1704153540000L; // Middle timestamp (00:59:00) - 1 min before current + long lastSyncStateTime = 1704153480000L; // Between in-progress and new file (00:58:00) + long roundTimeMills = 60000L; // 1 minute + + // Expected: lastRoundProcessed uses minimum timestamp (in-progress file) + long expectedEndTime = (inProgressFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync uses minimum of lastSyncStateTime and file timestamps + long expectedSyncEndTime = (inProgressFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundInSync = new ReplicationRound(expectedSyncEndTime - roundTimeMills, expectedSyncEndTime); + + testInitializeLastRoundProcessedHelper( + currentTime, + lastSyncStateTime, + newFileTimestamp, + inProgressFileTimestamp, + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + } + + @Test + public void testInitializeLastRoundProcessed_DegradedStateWithBothFilesNewFileIsMin() throws IOException { + long currentTime = 1704153600000L; // 2024-01-02 00:00:00 + long newFileTimestamp = 1704153420000L; // Earlier timestamp (00:57:00) - 3 min before current + long inProgressFileTimestamp = 1704153540000L; // Later timestamp (00:59:00) - 1 min before current + long lastSyncStateTime = 1704153480000L; // Between new and in-progress file (00:58:00) + long roundTimeMills = 60000L; // 1 minute + + // Expected: lastRoundProcessed uses minimum timestamp (new file) + long expectedEndTime = (newFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync uses minimum of lastSyncStateTime and file timestamps + long expectedSyncEndTime = (newFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundInSync = new ReplicationRound(expectedSyncEndTime - roundTimeMills, expectedSyncEndTime); + + testInitializeLastRoundProcessedHelper( + currentTime, + lastSyncStateTime, + newFileTimestamp, + inProgressFileTimestamp, + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + } + + @Test + public void testInitializeLastRoundProcessed_DegradedStateWithLastSyncStateAsMin() throws IOException { + long newFileTimestamp = 1704240060000L; + long lastSyncStateTime = 1704240030000L; + long currentTime = 1704240900000L; + long roundTimeMills = 60000L; // 1 minute + + // Expected: lastRoundProcessed uses minimum of new files and current time + long expectedEndTime = (newFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync uses minimum of lastSyncStateTime and file timestamps + long expectedSyncEndTime = (lastSyncStateTime / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundInSync = new ReplicationRound(expectedSyncEndTime - roundTimeMills, expectedSyncEndTime); + + testInitializeLastRoundProcessedHelper( + currentTime, + lastSyncStateTime, + newFileTimestamp, + null, // no in-progress file + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + } + + @Test + public void testInitializeLastRoundProcessed_DegradedStateWithNoFiles() throws IOException { + long currentTime = 1704326400000L; + long lastSyncStateTime = 1704326300000L; + long roundTimeMills = 60000L; // 1 minute + + // Expected: lastRoundProcessed uses current time + long expectedEndTime = (currentTime / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync uses minimum of lastSyncStateTime and current time + long expectedSyncEndTime = (lastSyncStateTime / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundInSync = new ReplicationRound(expectedSyncEndTime - roundTimeMills, expectedSyncEndTime); + + testInitializeLastRoundProcessedHelper( + currentTime, + lastSyncStateTime, + null, // no new file + null, // no in-progress file + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + } + + @Test + public void testInitializeLastRoundProcessed_SyncStateWithInProgressFiles() throws IOException { + long currentTime = 1704412800000L; + long inProgressTimestamp = 1704412680000L; // 2 min before current time + long roundTimeMills = 60000L; // 1 minute + + // Expected: uses in-progress file timestamp + long expectedEndTime = (inProgressTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync equals lastRoundProcessed in SYNC state + ReplicationRound expectedLastRoundInSync = expectedLastRoundProcessed; + + testInitializeLastRoundProcessedHelper( + currentTime, + null, // SYNC state - lastSyncStateTime not used + null, // no new file + inProgressTimestamp, + HAGroupStoreRecord.HAGroupState.STANDBY, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + } + + @Test + public void testInitializeLastRoundProcessed_SyncStateWithNewFiles() throws IOException { + long currentTime = 1704499200000L; + long newFileTimestamp = 1704499080000L; // 2 min before current time + long roundTimeMills = 60000L; // 1 minute + + // Expected: uses new file timestamp + long expectedEndTime = (newFileTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync equals lastRoundProcessed in SYNC state + ReplicationRound expectedLastRoundInSync = expectedLastRoundProcessed; + + testInitializeLastRoundProcessedHelper( + currentTime, + null, // SYNC state - lastSyncStateTime not used + newFileTimestamp, + null, // no in-progress file + HAGroupStoreRecord.HAGroupState.STANDBY, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + } + + @Test + public void testInitializeLastRoundProcessed_SyncStateWithBothFiles() throws IOException { + long currentTime = 1704499200000L; + long inProgressTimestamp = 1704499020000L; // Earlier timestamp - 3 min before current + long newFileTimestamp = 1704499140000L; // Later timestamp - 1 min before current + long roundTimeMills = 60000L; // 1 minute + + // Expected: uses minimum timestamp (in-progress file) + long expectedEndTime = (inProgressTimestamp / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync equals lastRoundProcessed in SYNC state + ReplicationRound expectedLastRoundInSync = expectedLastRoundProcessed; + + testInitializeLastRoundProcessedHelper( + currentTime, + null, // SYNC state - lastSyncStateTime not used + newFileTimestamp, + inProgressTimestamp, + HAGroupStoreRecord.HAGroupState.STANDBY, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + } + + @Test + public void testInitializeLastRoundProcessed_SyncStateWithNoFiles() throws IOException { + long currentTime = 1704585600000L; + long roundTimeMills = 60000L; // 1 minute + + // Expected: uses current time when no files exist + long expectedEndTime = (currentTime / TimeUnit.MINUTES.toMillis(1)) * TimeUnit.MINUTES.toMillis(1); + ReplicationRound expectedLastRoundProcessed = new ReplicationRound(expectedEndTime - roundTimeMills, expectedEndTime); + + // Expected: lastRoundInSync equals lastRoundProcessed in SYNC state + ReplicationRound expectedLastRoundInSync = expectedLastRoundProcessed; + + testInitializeLastRoundProcessedHelper( + currentTime, + null, // SYNC state - lastSyncStateTime not used + null, // no new file + null, // no in-progress file + HAGroupStoreRecord.HAGroupState.STANDBY, + expectedLastRoundProcessed, + expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + } + + /** + * Helper method to test initializeLastRoundProcessed with various file and state configurations. + * Handles file creation, state setup, and validation of lastRoundProcessed and lastRoundInSync. + * + * @param currentTime Current time for the test + * @param lastSyncStateTime Last sync state time for HAGroupStoreRecord (use null for SYNC state) + * @param newFileTimestamp Timestamp for new file (use null to skip creating new file) + * @param inProgressFileTimestamp Timestamp for in-progress file (use null to skip creating in-progress file) + * @param haGroupState HAGroupState for the test + * @param expectedLastRoundProcessed Expected lastRoundProcessed after initialization + * @param expectedLastRoundInSync Expected lastRoundInSync after initialization + * @param expectedReplayState Expected ReplicationReplayState after initialization + */ + private void testInitializeLastRoundProcessedHelper( + long currentTime, + Long lastSyncStateTime, + Long newFileTimestamp, + Long inProgressFileTimestamp, + HAGroupStoreRecord.HAGroupState haGroupState, + ReplicationRound expectedLastRoundProcessed, + ReplicationRound expectedLastRoundInSync, + ReplicationLogDiscoveryReplay.ReplicationReplayState expectedReplayState) throws IOException { + + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + fileTracker.init(); + + try { + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager() + .getReplicationRoundDurationSeconds() * 1000L; + + // Create in-progress file if timestamp provided + if (inProgressFileTimestamp != null) { + Path inProgressDir = fileTracker.getInProgressDirPath(); + localFs.mkdirs(inProgressDir); + Path inProgressFile = new Path(inProgressDir, inProgressFileTimestamp + "_rs-1_uuid.plog"); + localFs.create(inProgressFile, true).close(); + } + + // Create new file if timestamp provided + if (newFileTimestamp != null) { + ReplicationRound newFileRound = new ReplicationRound( + newFileTimestamp - roundTimeMills, newFileTimestamp); + Path shardPath = fileTracker.getReplicationShardDirectoryManager() + .getShardDirectory(newFileRound.getStartTime()); + localFs.mkdirs(shardPath); + Path newFile = new Path(shardPath, newFileTimestamp + "_rs-1.plog"); + localFs.create(newFile, true).close(); + } + + // Create HAGroupStoreRecord + long recordTime = lastSyncStateTime != null ? lastSyncStateTime : currentTime; + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord( + HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, haGroupState, recordTime); + + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + discovery.initializeLastRoundProcessed(); + + // Verify lastRoundProcessed + ReplicationRound lastRoundProcessed = discovery.getLastRoundProcessed(); + assertNotNull("Last round processed should not be null", lastRoundProcessed); + assertEquals("Last round processed should match expected", + expectedLastRoundProcessed, lastRoundProcessed); + + // Verify lastRoundInSync + ReplicationRound lastRoundInSync = discovery.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should match expected", + expectedLastRoundInSync, lastRoundInSync); + + // Verify state + assertEquals("Replication replay state should match expected", + expectedReplayState, discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests replay in SYNC state processing multiple rounds. + * Validates that both lastRoundProcessed and lastRoundInSync advance together. + */ + @Test + public void testReplay_SyncState_ProcessMultipleRounds() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704153600000L; // 2024-01-02 00:00:00 + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for SYNC state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, initialEndTime); + + // Set current time to allow processing 3 rounds + long currentTime = initialEndTime + (3 * totalWaitTime); + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(initialRound); + discovery.setLastRoundInSync(initialRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + discovery.replay(); + + // Verify processRound was called 3 times + assertEquals("processRound should be called 3 times", 3, discovery.getProcessRoundCallCount()); + + // Verify the rounds passed to processRound + List<ReplicationRound> processedRounds = discovery.getProcessedRounds(); + ReplicationRound expectedRound1 = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("First round should match expected", expectedRound1, processedRounds.get(0)); + + ReplicationRound expectedRound2 = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Second round should match expected", expectedRound2, processedRounds.get(1)); + + ReplicationRound expectedRound3 = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + assertEquals("Third round should match expected", expectedRound3, processedRounds.get(2)); + + // Verify lastRoundProcessed was updated to 3rd round + ReplicationRound lastRoundProcessed = discovery.getLastRoundProcessed(); + assertNotNull("Last round processed should not be null", lastRoundProcessed); + assertEquals("Last round processed should be updated to 3rd round", + expectedRound3, lastRoundProcessed); + + // Verify lastRoundInSync was also updated in SYNC state + ReplicationRound lastRoundInSync = discovery.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should be updated to match last round processed in SYNC state", + expectedRound3, lastRoundInSync); + + // Verify state remains SYNC + assertEquals("State should remain SYNC", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests replay in DEGRADED state processing multiple rounds. + * Validates that lastRoundProcessed advances but lastRoundInSync is preserved. + */ + @Test + public void testReplay_DegradedState_MultipleRounds() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704240000000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for DEGRADED state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, initialEndTime); + + // Set current time to allow processing 3 rounds + long currentTime = initialEndTime + (3 * totalWaitTime); + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime)); + ReplicationRound lastRoundInSyncBeforeReplay = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundInSync(lastRoundInSyncBeforeReplay); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + + ReplicationRound lastRoundBeforeReplay = discovery.getLastRoundProcessed(); + + discovery.replay(); + + // Verify processRound was called 3 times + assertEquals("processRound should be called 3 times", 3, discovery.getProcessRoundCallCount()); + + // Verify the rounds passed to processRound + List<ReplicationRound> processedRounds = discovery.getProcessedRounds(); + ReplicationRound expectedRound1 = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("First round should match expected", expectedRound1, processedRounds.get(0)); + + ReplicationRound expectedRound2 = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Second round should match expected", expectedRound2, processedRounds.get(1)); + + ReplicationRound expectedRound3 = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + assertEquals("Third round should match expected", expectedRound3, processedRounds.get(2)); + + // Verify lastRoundProcessed was updated + ReplicationRound lastRoundAfterReplay = discovery.getLastRoundProcessed(); + assertEquals("Last round processed should be updated to 3rd round in DEGRADED state", + expectedRound3, lastRoundAfterReplay); + + // Verify lastRoundInSync was NOT updated (preserved in DEGRADED state) + ReplicationRound lastRoundInSyncAfterReplay = discovery.getLastRoundInSync(); + assertEquals("Last round in sync should NOT be updated in DEGRADED state", + lastRoundInSyncBeforeReplay, lastRoundInSyncAfterReplay); + + // Verify state remains DEGRADED + assertEquals("State should remain DEGRADED", + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests replay in SYNCED_RECOVERY state with rewind to lastRoundInSync. + * Validates that processing rewinds and re-processes from lastRoundInSync. + */ + @Test + public void testReplay_SyncedRecoveryState_RewindToLastInSync() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704326400000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for STANDBY state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, initialEndTime); + + // Set current time to allow processing multiple rounds + long currentTime = initialEndTime + (5 * totalWaitTime); + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + // Set initial state: lastRoundProcessed is ahead, lastRoundInSync is behind + ReplicationRound lastInSyncRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + ReplicationRound currentRound = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + + discovery.setLastRoundProcessed(currentRound); + discovery.setLastRoundInSync(lastInSyncRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + + discovery.replay(); + + // Verify processRound was called 6 times + // Flow: 1 round in SYNCED_RECOVERY (triggers rewind), then 5 rounds in SYNC + // getFirstRoundToProcess() uses lastRoundInSync.endTime = initialEndTime + // After processing first round in SYNCED_RECOVERY, it rewinds to lastRoundInSync + // Then continues processing from initialEndTime again (re-processing first round) + assertEquals("processRound should be called 6 times", 6, discovery.getProcessRoundCallCount()); + + // Verify the first round - starts from lastRoundInSync.endTime = initialEndTime (via getFirstRoundToProcess) + List<ReplicationRound> processedRounds = discovery.getProcessedRounds(); + ReplicationRound expectedFirstRound = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("First round (SYNCED_RECOVERY) should start from lastRoundInSync.endTime", + expectedFirstRound, processedRounds.get(0)); + + // After SYNCED_RECOVERY rewind, processing restarts from lastRoundInSync.endTime = initialEndTime + // This means round 1 is processed again, then rounds 2-5 + ReplicationRound expectedSecondRound = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("Second round (first in SYNC, re-processing) should match expected", + expectedSecondRound, processedRounds.get(1)); + + ReplicationRound expectedThirdRound = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Third round should match expected", + expectedThirdRound, processedRounds.get(2)); + + ReplicationRound expectedFourthRound = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + assertEquals("Fourth round should match expected", + expectedFourthRound, processedRounds.get(3)); + + ReplicationRound expectedFifthRound = new ReplicationRound(initialEndTime + (3 * roundTimeMills), initialEndTime + (4 * roundTimeMills)); + assertEquals("Fifth round should match expected", + expectedFifthRound, processedRounds.get(4)); + + ReplicationRound expectedSixthRound = new ReplicationRound(initialEndTime + (4 * roundTimeMills), initialEndTime + (5 * roundTimeMills)); + assertEquals("Sixth round should match expected", + expectedSixthRound, processedRounds.get(5)); + + // Verify lastRoundProcessed was updated + ReplicationRound lastRoundProcessed = discovery.getLastRoundProcessed(); + assertNotNull("Last round processed should not be null", lastRoundProcessed); + assertEquals("Last round processed should be updated to 5th round", + expectedSixthRound, lastRoundProcessed); + + // Verify lastRoundInSync was also updated in SYNC state (after rewind and transition) + ReplicationRound lastRoundInSync = discovery.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should be updated to match last round processed after SYNC transition", + expectedSixthRound, lastRoundInSync); + + // Verify state transitioned to SYNC + assertEquals("State should transition to SYNC", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests state transition from SYNC to DEGRADED during replay processing. + * Validates that lastRoundInSync is preserved at the last SYNC round. + */ + @Test + public void testReplay_StateTransition_SyncToDegradedDuringProcessing() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704412800000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for STANDBY state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, initialEndTime); + + // Set current time to allow processing 5 rounds + long currentTime = initialEndTime + (5 * totalWaitTime); + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(initialRound); + discovery.setLastRoundInSync(initialRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + // Simulate listener changing state to DEGRADED after 2 rounds + discovery.setStateChangeAfterRounds(2, ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + + ReplicationRound lastRoundBeforeReplay = discovery.getLastRoundProcessed(); + + discovery.replay(); + + // Verify processRound was called 5 times (2 in SYNC, 3 in DEGRADED) + assertEquals("processRound should be called 5 times", 5, discovery.getProcessRoundCallCount()); + + // Verify the rounds passed to processRound + List<ReplicationRound> processedRounds = discovery.getProcessedRounds(); + + // First 2 rounds in SYNC mode + ReplicationRound expectedRound1 = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("First round should match expected", expectedRound1, processedRounds.get(0)); + + ReplicationRound expectedRound2 = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Second round should match expected", expectedRound2, processedRounds.get(1)); + + // Remaining 3 rounds in DEGRADED mode + ReplicationRound expectedRound3 = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + assertEquals("Third round should match expected", expectedRound3, processedRounds.get(2)); + + ReplicationRound expectedRound4 = new ReplicationRound(initialEndTime + (3 * roundTimeMills), initialEndTime + (4 * roundTimeMills)); + assertEquals("Fourth round should match expected", expectedRound4, processedRounds.get(3)); + + ReplicationRound expectedRound5 = new ReplicationRound(initialEndTime + (4 * roundTimeMills), initialEndTime + (5 * roundTimeMills)); + assertEquals("Fifth round should match expected", expectedRound5, processedRounds.get(4)); + + // Verify lastRoundProcessed was updated to 5th round + ReplicationRound lastRoundAfterReplay = discovery.getLastRoundProcessed(); + assertEquals("Last round processed should be updated to 5th round", + expectedRound5, lastRoundAfterReplay); + + // Verify lastRoundInSync was updated only for first round (SYNC), then preserved + // State changed to DEGRADED AFTER processing round 2, but BEFORE updating lastRoundInSync for round 2 + // So lastRoundInSync should remain at round 1 (the last round fully completed in SYNC state) + ReplicationRound lastRoundInSync = discovery.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should be preserved at round 1 (last fully completed SYNC round)", + expectedRound1, lastRoundInSync); + + // Verify state is now DEGRADED + assertEquals("State should be DEGRADED", + ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests state transition from DEGRADED to SYNCED_RECOVERY and then to SYNC. + * Validates rewind behavior and lastRoundInSync update after SYNC transition. + */ + @Test + public void testReplay_StateTransition_DegradedToSyncedRecovery() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704499200000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for DEGRADED state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, initialEndTime); + + // Set current time to allow processing 5 rounds + long currentTime = initialEndTime + (5 * roundTimeMills) + bufferMillis; + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + + ReplicationRound lastInSyncRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills))); + discovery.setLastRoundInSync(lastInSyncRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + + // Simulate listener changing state to SYNCED_RECOVERY after 2 rounds + discovery.setStateChangeAfterRounds(2, ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + + discovery.replay(); + + // Verify processRound was called: 2 in DEGRADED, then 5 in SYNC + // Total: 2 + 5 = 7 calls + // First call uses getFirstRoundToProcess() which starts from lastRoundInSync.endTime = initialEndTime + // After state change to SYNCED_RECOVERY, it rewinds and continues from lastRoundInSync.endTime + assertEquals("processRound should be called 7 times", 7, discovery.getProcessRoundCallCount()); + + // Verify the rounds passed to processRound + List<ReplicationRound> processedRounds = discovery.getProcessedRounds(); + + // First 2 rounds in DEGRADED mode (starting from lastRoundInSync.endTime = initialEndTime) + // getFirstRoundToProcess() uses lastRoundInSync.endTime, not lastRoundProcessed.endTime + ReplicationRound expectedRound1 = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("First round should start from lastRoundInSync.endTime", expectedRound1, processedRounds.get(0)); + + ReplicationRound expectedRound2 = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Second round should match expected", expectedRound2, processedRounds.get(1)); + + // After 2 rounds, state changes to SYNCED_RECOVERY and rewinds to lastRoundInSync + // Processing restarts from lastRoundInSync.endTime = initialEndTime (re-processing rounds 1-2 and continuing) + ReplicationRound expectedRound3 = new ReplicationRound(initialEndTime, initialEndTime + roundTimeMills); + assertEquals("Third round (after rewind) should restart from lastRoundInSync.endTime", expectedRound3, processedRounds.get(2)); + + ReplicationRound expectedRound4 = new ReplicationRound(initialEndTime + roundTimeMills, initialEndTime + (2 * roundTimeMills)); + assertEquals("Fourth round should match expected", expectedRound4, processedRounds.get(3)); + + ReplicationRound expectedRound5 = new ReplicationRound(initialEndTime + (2 * roundTimeMills), initialEndTime + (3 * roundTimeMills)); + assertEquals("Fifth round should match expected", expectedRound5, processedRounds.get(4)); + + ReplicationRound expectedRound6 = new ReplicationRound(initialEndTime + (3 * roundTimeMills), initialEndTime + (4 * roundTimeMills)); + assertEquals("Sixth round should match expected", expectedRound6, processedRounds.get(5)); + + ReplicationRound expectedRound7 = new ReplicationRound(initialEndTime + (4 * roundTimeMills), initialEndTime + (5 * roundTimeMills)); + assertEquals("Seventh round should match expected", expectedRound7, processedRounds.get(6)); + + // Verify lastRoundProcessed was updated to 7th round + ReplicationRound lastRoundAfterReplay = discovery.getLastRoundProcessed(); + assertEquals("Last round processed should be updated to 7th round", + expectedRound7, lastRoundAfterReplay); + + // Verify lastRoundInSync was preserved during DEGRADED, then updated during SYNC + // After transition to SYNC (from SYNCED_RECOVERY), lastRoundInSync should match lastRoundProcessed + ReplicationRound lastRoundInSync = discovery.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should be updated to match last round processed after SYNC transition", + expectedRound7, lastRoundInSync); + + // Verify state transitioned to SYNC (from SYNCED_RECOVERY) + assertEquals("State should be SYNC after SYNCED_RECOVERY", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests state transition from SYNC to DEGRADED and back through SYNCED_RECOVERY to SYNC. + * Validates lastRoundInSync preservation during DEGRADED, rewind in SYNCED_RECOVERY, and update in SYNC. + */ + @Test + public void testReplay_StateTransition_SyncToDegradedAndBackToSync() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704672000000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + long bufferMillis = (long) (roundTimeMills * 0.15); + long totalWaitTime = roundTimeMills + bufferMillis; + + // Create HAGroupStoreRecord for STANDBY state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, initialEndTime); + + // Set current time to allow processing enough rounds (including rewind) + long currentTime = initialEndTime + (10 * roundTimeMills) + bufferMillis; + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(initialRound); + discovery.setLastRoundInSync(initialRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + // Simulate state transitions: + // - After 2 rounds: SYNC -> DEGRADED + // - After 5 rounds: DEGRADED -> SYNCED_RECOVERY (triggers rewind to lastRoundInSync) + TestableReplicationLogDiscoveryReplay discoveryWithTransitions = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord) { + private int roundCount = 0; + + @Override + protected void processRound(ReplicationRound replicationRound) throws IOException { + super.processRound(replicationRound); + roundCount++; + + // Transition to DEGRADED after 2 rounds + if (roundCount == 2) { + setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED); + } + // Transition to SYNCED_RECOVERY after 5 rounds (will trigger rewind) + else if (roundCount == 5) { + setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY); + } + } + }; + + discoveryWithTransitions.setLastRoundProcessed(initialRound); + discoveryWithTransitions.setLastRoundInSync(initialRound); + discoveryWithTransitions.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + discoveryWithTransitions.replay(); + + // Verify processRound was called exactly 15 times: + // - 2 rounds in SYNC (rounds 1-2) + // - 3 rounds in DEGRADED (rounds 3-5) + // - 1 round in SYNCED_RECOVERY (round 6, triggers rewind to lastRoundInSync) + // - After rewind, continues from lastRoundInSync.endTime = firstRound + // - 8 more rounds in SYNC (rounds 7-14, from initialEndTime to initialEndTime + 8*roundTimeMills) + int totalRoundsProcessed = discoveryWithTransitions.getProcessRoundCallCount(); + assertEquals("processRound should be called exactly 14 times", 14, totalRoundsProcessed); + + // Verify the rounds passed to processRound + List<ReplicationRound> processedRounds = discoveryWithTransitions.getProcessedRounds(); + + // Rounds 1-2: SYNC mode (starting from initialEndTime) + for (int i = 0; i < 2; i++) { + long startTime = initialEndTime + (i * roundTimeMills); + long endTime = initialEndTime + ((i + 1) * roundTimeMills); + ReplicationRound expectedRound = new ReplicationRound(startTime, endTime); + assertEquals("Round " + (i + 1) + " (SYNC) should match expected", + expectedRound, processedRounds.get(i)); + } + + // Rounds 3-5: DEGRADED mode (continuing from round 2) + for (int i = 2; i < 5; i++) { + long startTime = initialEndTime + (i * roundTimeMills); + long endTime = initialEndTime + ((i + 1) * roundTimeMills); + ReplicationRound expectedRound = new ReplicationRound(startTime, endTime); + assertEquals("Round " + (i + 1) + " (DEGRADED) should match expected", + expectedRound, processedRounds.get(i)); + } + + // Round 6-14: SYNC mode after SYNCED_RECOVERY rewind + // After rewind, starts from lastRoundInSync.endTime = initialEndTime + roundTimeMills + for (int i = 5; i < 14; i++) { + // Offset by 1 because rewind goes back to lastRoundInSync.endTime + long startTime = initialEndTime + ((i - 4) * roundTimeMills); + long endTime = initialEndTime + ((i - 3) * roundTimeMills); + ReplicationRound expectedRound = new ReplicationRound(startTime, endTime); + assertEquals("Round " + (i + 1) + " (SYNC after rewind) should match expected", + expectedRound, processedRounds.get(i)); + } + + // Verify lastRoundProcessed was updated to the last processed round (round 14) + ReplicationRound lastRoundAfterReplay = discoveryWithTransitions.getLastRoundProcessed(); + assertNotNull("Last round processed should not be null", lastRoundAfterReplay); + ReplicationRound expectedLastRound = new ReplicationRound( + initialEndTime + (9 * roundTimeMills), + initialEndTime + (10 * roundTimeMills)); + assertEquals("Last round processed should be the 14th round", + expectedLastRound, lastRoundAfterReplay); + + // Verify lastRoundInSync behavior: + // - Updated for round 1 (SYNC) + // - State changed to DEGRADED after round 2, so lastRoundInSync stays at round 1 + // - Preserved during rounds 3-5 (DEGRADED) + // - SYNCED_RECOVERY triggers rewind to lastRoundInSync + // - After transition to SYNC, lastRoundInSync updates for rounds 6-14 + // - Final lastRoundInSync should match lastRoundProcessed (round 14) + ReplicationRound lastRoundInSync = discoveryWithTransitions.getLastRoundInSync(); + assertNotNull("Last round in sync should not be null", lastRoundInSync); + assertEquals("Last round in sync should match last round processed after returning to SYNC", + expectedLastRound, lastRoundInSync); + + // Verify state transitioned to SYNC (from SYNCED_RECOVERY) + assertEquals("State should be SYNC after SYNCED_RECOVERY", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discoveryWithTransitions.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + /** + * Tests replay when no rounds are ready to process. + * Validates that lastRoundProcessed and lastRoundInSync remain unchanged. + */ + @Test + public void testReplay_NoRoundsToProcess() throws IOException { + TestableReplicationLogTracker fileTracker = createReplicationLogTracker(conf, haGroupName, localFs, standbyUri); + + try { + long initialEndTime = 1704585600000L; + long roundTimeMills = fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds() * 1000L; + + // Create HAGroupStoreRecord for SYNC state + HAGroupStoreRecord mockRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.STANDBY, initialEndTime); + + // Set current time to NOT allow processing any rounds (not enough time has passed) + long currentTime = initialEndTime + 1000L; // Only 1 second after + EnvironmentEdge edge = () -> currentTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + TestableReplicationLogDiscoveryReplay discovery = + new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord); + ReplicationRound initialRound = new ReplicationRound(initialEndTime - roundTimeMills, initialEndTime); + discovery.setLastRoundProcessed(initialRound); + discovery.setLastRoundInSync(initialRound); + discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC); + + ReplicationRound lastRoundBeforeReplay = discovery.getLastRoundProcessed(); + ReplicationRound lastRoundInSyncBeforeReplay = discovery.getLastRoundInSync(); + + discovery.replay(); + + // Verify processRound was not called + assertEquals("processRound should not be called when no rounds to process", + 0, discovery.getProcessRoundCallCount()); + + // Verify lastRoundProcessed was not changed + ReplicationRound lastRoundAfterReplay = discovery.getLastRoundProcessed(); + assertEquals("Last round processed should not change when no rounds to process", + lastRoundBeforeReplay, lastRoundAfterReplay); + + // Verify lastRoundInSync was not changed + ReplicationRound lastRoundInSyncAfterReplay = discovery.getLastRoundInSync(); + assertEquals("Last round in sync should not change when no rounds to process", + lastRoundInSyncBeforeReplay, lastRoundInSyncAfterReplay); + + // Verify state remains SYNC + assertEquals("State should remain SYNC", + ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC, + discovery.getReplicationReplayState()); + } finally { + EnvironmentEdgeManager.reset(); + } + } finally { + fileTracker.close(); + } + } + + private TestableReplicationLogTracker createReplicationLogTracker(final Configuration conf, final String haGroupName, final FileSystem fileSystem, final URI rootURI) throws IOException { + TestableReplicationLogTracker testableReplicationLogTracker = new TestableReplicationLogTracker(conf, haGroupName, fileSystem, rootURI, ReplicationLogTracker.DirectoryType.IN, METRICS_REPLICATION_LOG_TRACKER); + testableReplicationLogTracker.init(); + return testableReplicationLogTracker; + } + + /** + * Testable implementation of ReplicationLogTracker for unit testing. + * Exposes protected methods to allow test access. + */ + private class TestableReplicationLogTracker extends ReplicationLogTracker { + public TestableReplicationLogTracker(Configuration conf, String haGroupName, FileSystem fileSystem, URI rootURI, DirectoryType directoryType, MetricsReplicationLogTracker metrics) { + super(conf, haGroupName, fileSystem, rootURI, directoryType, metrics); + } + public Path getInProgressDirPath() { + return super.getInProgressDirPath(); + } + } + + /** + * Testable implementation of ReplicationLogDiscoveryReplay for unit testing. + * Provides dependency injection for HAGroupStoreRecord, tracks processed rounds, + * and supports simulating state changes during replay. + */ + private static class TestableReplicationLogDiscoveryReplay extends ReplicationLogDiscoveryReplay { + private final HAGroupStoreRecord haGroupStoreRecord; + private int roundsProcessed = 0; + private int stateChangeAfterRounds = -1; + private ReplicationReplayState newStateAfterRounds = null; + private final List<ReplicationRound> processedRounds = new java.util.ArrayList<>(); + + public TestableReplicationLogDiscoveryReplay(ReplicationLogTracker replicationLogReplayFileTracker, + HAGroupStoreRecord haGroupStoreRecord) { + super(replicationLogReplayFileTracker); + this.haGroupStoreRecord = haGroupStoreRecord; + } + + @Override + protected HAGroupStoreRecord getHAGroupRecord() { + return haGroupStoreRecord; + } + + @Override + protected void processRound(ReplicationRound replicationRound) throws IOException { + System.out.println("Processing Round: " + replicationRound); + // Track processed rounds + processedRounds.add(replicationRound); + + // Simulate state change by listener after certain number of rounds + roundsProcessed++; + if (stateChangeAfterRounds > 0 && roundsProcessed == stateChangeAfterRounds && newStateAfterRounds != null) { + System.out.println("Rounds Processed: " + roundsProcessed + " - " + newStateAfterRounds); Review Comment: Same. ########## phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java: ########## @@ -0,0 +1,1467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogDiscoveryTest { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogDiscoveryTest.class); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private TestableReplicationLogDiscovery discovery; + private TestableReplicationLogTracker fileTracker; + private Configuration conf; + private FileSystem localFs; + private URI rootURI; + private Path testFolderPath; + private static final String haGroupName = "testGroup"; + private static final MetricsReplicationLogTracker metricsLogTracker = new MetricsReplicationLogTrackerReplayImpl(haGroupName); + private static final MetricsReplicationLogDiscovery metricsLogDiscovery = new MetricsReplicationLogDiscoveryReplayImpl(haGroupName); + + @Before + public void setUp() throws IOException { + conf = HBaseConfiguration.create(); + localFs = FileSystem.getLocal(conf); + rootURI = new Path(testFolder.getRoot().toString()).toUri(); + testFolderPath = new Path(testFolder.getRoot().getAbsolutePath()); + fileTracker = Mockito.spy(new TestableReplicationLogTracker(conf, haGroupName, localFs, rootURI)); + fileTracker.init(); + + discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker)); + Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics(); + } + + @After + public void tearDown() throws IOException { + if (discovery != null) { + discovery.stop(); + discovery.close(); + } + if(fileTracker != null) { + fileTracker.close(); + } + localFs.delete(new Path(testFolder.getRoot().toURI()), true); + } + + /** + * Tests the start and stop lifecycle of ReplicationLogDiscovery. + * Validates scheduler initialization, thread naming, and proper cleanup. + */ + @Test + public void testStartAndStop() throws IOException { + // 1. Validate that it's not running initially + assertFalse("Discovery should not be running initially", discovery.isRunning()); + + // 2. Validate that scheduler is set to null initially + assertNull("Scheduler should be null initially", discovery.getScheduler()); + + // 3. Call the start method + discovery.start(); + + // 4. Ensure isRunning is set to true + assertTrue("Discovery should be running after start", discovery.isRunning()); + + // 5. Ensure scheduler is started with correct parameters + assertNotNull("Scheduler should not be null after start", discovery.getScheduler()); + assertFalse("Scheduler should not be shutdown after start", discovery.getScheduler().isShutdown()); + + // Verify thread name format + String threadName = discovery.getExecutorThreadNameFormat(); + assertTrue("Thread name should contain ReplicationLogDiscovery", threadName.contains("ReplicationLogDiscovery")); + + // Verify replay interval + long replayInterval = discovery.getReplayIntervalSeconds(); + assertEquals("Replay interval should be 10 seconds", 10L, replayInterval); + + // 6. Ensure starting again does not create a new scheduler (and also should not throw any exception) + ScheduledExecutorService originalScheduler = discovery.getScheduler(); + discovery.start(); // Should not create new scheduler + ScheduledExecutorService sameScheduler = discovery.getScheduler(); + assertSame("Should reuse the same scheduler instance", originalScheduler, sameScheduler); + assertTrue("Discovery should still be running", discovery.isRunning()); + + // 7. Call stop + discovery.stop(); + + // 8. Ensure scheduler is stopped + assertTrue("Scheduler should be shutdown after stop", discovery.getScheduler().isShutdown()); + + // 9. Ensure isRunning is false + assertFalse("Discovery should not be running after stop", discovery.isRunning()); + } + + /** + * Tests processRound with in-progress directory processing enabled. + * Validates that both new files and in-progress files are processed correctly. + */ + @Test + public void testProcessRoundWithInProgressDirectoryProcessing() throws IOException { + // 1. Create new files with start of the day round (00:00:00) + ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 1704153660000L); // 00:00:00 - 00:01:00 + List<Path> newFilesForRound = createNewFilesForRound(replicationRound, 3); + + // 2. Create file for shard count min round (which should also go to same shard) + int shardCount = fileTracker.getReplicationShardDirectoryManager().getAllShardPaths().size(); + ReplicationRound differentRoundSameShard = new ReplicationRound(1704153600000L + (shardCount * 60 * 1000L), 1704153660000L + (shardCount * 60 * 1000L)); + List<Path> differentRoundSameShardFiles = createNewFilesForRound(differentRoundSameShard, 2); + + // 3. Create files for (00:01:00) and (00:02:00) start time of the rounds + ReplicationRound round0100 = new ReplicationRound(1704153660000L, 1704153720000L); // 00:01:00 - 00:02:00 + List<Path> round0100NewFiles = createNewFilesForRound(round0100, 2); + ReplicationRound round0200 = new ReplicationRound(1704153720000L, 1704153780000L); // 00:02:00 - 00:03:00 + List<Path> round0200NewFiles = createNewFilesForRound(round0200, 2); + + // 4. Create 2 in progress files for (00:00:04) timestamp + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 2); + + // 5. Create 2 in progress files for (00:01:02) timestamp + long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02 + List<Path> inProgressFiles0102 = createInProgressFiles(timestamp0102, 2); + + // 6. Mock shouldProcessInProgressDirectory to return true + discovery.setMockShouldProcessInProgressDirectory(true); + + // Process the start of day round + discovery.processRound(replicationRound); + + // 7. Ensure current round new files (3) are processed and in progress (4) are processed (Total 7) + List<Path> processedFiles = discovery.getProcessedFiles(); + assertEquals("Invalid number of files processed", 7, processedFiles.size()); + + // Create set of expected files that should be processed + Set<String> expectedProcessedPaths = new HashSet<>(); + for (Path file : newFilesForRound) { + expectedProcessedPaths.add(file.toUri().getPath()); + } + for (Path file : inProgressFiles0004) { + expectedProcessedPaths.add(file.toUri().getPath()); + } + for (Path file : inProgressFiles0102) { + expectedProcessedPaths.add(file.toUri().getPath()); + } + + // Create set of actually processed file paths + Set<String> actualProcessedPaths = new HashSet<>(); + for (Path file : processedFiles) { + actualProcessedPaths.add(file.toUri().getPath()); + } + + // Validate that sets are equal + assertEquals("Expected and actual processed files should match", expectedProcessedPaths, actualProcessedPaths); + + // Verify that shouldProcessInProgressDirectory was called once + Mockito.verify(discovery, Mockito.times(1)).shouldProcessInProgressDirectory(); + + // Validate that files from other rounds were NOT processed + for (Path unexpectedFile : differentRoundSameShardFiles) { + assertFalse("Should NOT have processed shard count round file: " + unexpectedFile.getName(), + processedFiles.stream().anyMatch(p -> p.toUri().getPath().equals(unexpectedFile.toUri().getPath()))); + } + + for (Path unexpectedFile : round0100NewFiles) { + assertFalse("Should NOT have processed round 00:01:00 file: " + unexpectedFile.getName(), + processedFiles.stream().anyMatch(p -> p.toUri().getPath().equals(unexpectedFile.toUri().getPath()))); + } + + for (Path unexpectedFile : round0200NewFiles) { + assertFalse("Should NOT have processed round 00:02:00 file: " + unexpectedFile.getName(), + processedFiles.stream().anyMatch(p -> p.toUri().getPath().equals(unexpectedFile.toUri().getPath()))); + } + } + + /** + * Tests processRound with in-progress directory processing disabled. + * Validates that only new files for the current round are processed. + */ + @Test + public void testProcessRoundWithoutInProgressDirectoryProcessing() throws IOException { + // 1. Create new files with start of the day round (00:00:00) + ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 1704153660000L); // 00:00:00 - 00:01:00 + List<Path> newFilesForRound = createNewFilesForRound(replicationRound, 3); + + // 2. Create file for shard count min round (which should also go to same shard) + int shardCount = fileTracker.getReplicationShardDirectoryManager().getAllShardPaths().size(); + ReplicationRound differentRoundSameShard = new ReplicationRound(1704153600000L + (shardCount * 60 * 1000L), 1704153660000L + (shardCount * 60 * 1000L)); + List<Path> differentRoundSameShardFiles = createNewFilesForRound(differentRoundSameShard, 2); + + // 3. Create files for (00:01:00) and (00:02:00) start time of the rounds + ReplicationRound round0100 = new ReplicationRound(1704153660000L, 1704153720000L); // 00:01:00 - 00:02:00 + List<Path> round0100NewFiles = createNewFilesForRound(round0100, 2); + ReplicationRound round0200 = new ReplicationRound(1704153720000L, 1704153780000L); // 00:02:00 - 00:03:00 + List<Path> round0200NewFiles = createNewFilesForRound(round0200, 2); + + // 4. Create 2 in progress files for (00:00:04) timestamp + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List<Path> inProgressFiles0004 = createInProgressFiles(timestamp0004, 2); + + // 5. Create 2 in progress files for (00:01:02) timestamp + long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02 + List<Path> inProgressFiles0102 = createInProgressFiles(timestamp0102, 2); + + // 6. Mock shouldProcessInProgressDirectory to return false + discovery.setMockShouldProcessInProgressDirectory(false); + + // Process the start of day round + discovery.processRound(replicationRound); + + // 7. Ensure only current round new files (3) are processed (Total 3, no in-progress files) + List<Path> processedFiles = discovery.getProcessedFiles(); + assertEquals("Invalid number of files processed", 3, processedFiles.size()); + + System.out.println("Processed files"); + for (Path file : processedFiles) { + System.out.println(file); + } + Review Comment: Same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
