tkhurana commented on code in PR #2144:
URL: https://github.com/apache/phoenix/pull/2144#discussion_r2105502815


##########
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java:
##########
@@ -0,0 +1,1229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+
+public class ReplicationLogTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogTest.class);
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private Configuration conf;
+    private ServerName serverName;
+    private FileSystem localFs;
+    private URI standbyUri;
+    private ReplicationLog logWriter;
+
+    static final int TEST_RINGBUFFER_SIZE = 32;
+    static final int TEST_SYNC_TIMEOUT = 1000;
+    static final int TEST_ROTATION_TIME = 5000;
+    static final int TEST_ROTATION_SIZE_BYTES = 10 * 1024;
+
+    @Before
+    public void setUp() throws IOException {
+        conf = HBaseConfiguration.create();
+        localFs = FileSystem.getLocal(conf);
+        standbyUri = new Path(testFolder.toString()).toUri();
+        serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
+        conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
+        // Small ring buffer size for testing
+        conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
+        // Set a short sync timeout for testing
+        conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
+        // Set rotation time to 10 seconds
+        conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
+        // Small size threshold for testing
+        conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+            TEST_ROTATION_SIZE_BYTES);
+
+        logWriter = spy(new TestableReplicationLog(conf, serverName));
+        logWriter.init();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (logWriter != null) {
+            logWriter.close();
+        }
+        // Deregister the metrics source that the replication log registers 
during initialization
+        // so the next unit will be able to register it again and successfully 
initialize.
+        DefaultMetricsSystem.instance()
+            .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
+    }
+
+    /**
+     * Tests basic append and sync functionality of the replication log. 
Verifies that mutations
+     * are correctly appended to the log and that sync operations properly 
commit the changes to
+     * disk.
+     */
+    @Test
+    public void testAppendAndSync() throws Exception {
+        final String tableName = "TESTTBL";
+        final long commitId1 = 1L;
+        final long commitId2 = 2L;
+        final long commitId3 = 3L;
+        final long commitId4 = 4L;
+        final long commitId5 = 5L;
+        final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+        final Mutation put2 = LogFileTestUtil.newPut("row2", 2, 1);
+        final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
+        final Mutation put4 = LogFileTestUtil.newPut("row4", 4, 1);
+        final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
+
+        // Get the inner writer
+        LogFileWriter writer = logWriter.getWriter();
+        assertNotNull("Writer should not be null", writer);
+        InOrder inOrder = Mockito.inOrder(writer);
+
+        logWriter.append(tableName, commitId1, put1);
+        logWriter.append(tableName, commitId2, put2);
+        logWriter.append(tableName, commitId3, put3);
+        logWriter.append(tableName, commitId4, put4);
+        logWriter.append(tableName, commitId5, put5);
+
+        logWriter.sync();
+
+        // Happens-before ordering verification, using Mockito's inOrder. 
Verify that the appends
+        // happen before sync, and sync happened after appends.
+        inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1), 
eq(put1));
+        inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2), 
eq(put2));
+        inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3), 
eq(put3));
+        inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4), 
eq(put4));
+        inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5), 
eq(put5));
+        inOrder.verify(writer, times(1)).sync();
+    }
+
+    /**
+     * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
+     * append failures by rolling to a new writer and retrying the operation.
+     */
+    @Test
+    public void testAppendFailureAndRetry() throws Exception {
+        final String tableName = "TBLAFR";
+        final long commitId = 1L;
+        final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+        // Get the inner writer
+        LogFileWriter writerBeforeRoll = logWriter.getWriter();
+        assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+        // Configure writerBeforeRoll to fail on the first append call
+        doThrow(new IOException("Simulated append failure"))
+            .when(writerBeforeRoll).append(anyString(), anyLong(), 
any(Mutation.class));
+
+        // Append data
+        logWriter.append(tableName, commitId, put);
+        logWriter.sync();
+
+        // Get the inner writer we rolled to.
+        LogFileWriter writerAfterRoll = logWriter.getWriter();
+        assertNotNull("Initial writer should not be null", writerBeforeRoll);

Review Comment:
   Should we be checking `writerAfterRoll` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to