This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 53b108bcd Revert "[server] Recover log and index file for unclean 
shutdown (#1749)" (#2036)
53b108bcd is described below

commit 53b108bcdc987ef0744541a73f5a909b8910b4be
Author: yunhong <[email protected]>
AuthorDate: Thu Nov 27 13:38:32 2025 +0800

    Revert "[server] Recover log and index file for unclean shutdown (#1749)" 
(#2036)
    
    This reverts commit d5cb521471eeac7a07120ecde1a199c61c62dd02.
---
 .../org/apache/fluss/server/log/LogLoader.java     | 151 +---------
 .../org/apache/fluss/server/log/LogSegment.java    |  20 +-
 .../org/apache/fluss/server/log/LogTablet.java     |   2 +-
 .../fluss/server/log/WriterStateManager.java       |   4 -
 .../org/apache/fluss/server/log/LogLoaderTest.java | 325 ---------------------
 5 files changed, 3 insertions(+), 499 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index 95f41c021..62d38581c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -19,7 +19,6 @@ package org.apache.fluss.server.log;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.InvalidOffsetException;
 import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
 import org.apache.fluss.exception.LogStorageException;
 import org.apache.fluss.metadata.LogFormat;
@@ -32,13 +31,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.stream.Collectors;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -123,37 +117,6 @@ final class LogLoader {
                         nextOffset, activeSegment.getBaseOffset(), 
activeSegment.getSizeInBytes()));
     }
 
-    /**
-     * Just recovers the given segment, without adding it to the provided 
segments.
-     *
-     * @param segment Segment to recover
-     * @return The number of bytes truncated from the segment
-     * @throws LogSegmentOffsetOverflowException if the segment contains 
messages that cause index
-     *     offset overflow
-     */
-    private int recoverSegment(LogSegment segment) throws IOException {
-        WriterStateManager writerStateManager =
-                new WriterStateManager(
-                        logSegments.getTableBucket(),
-                        logTabletDir,
-                        this.writerStateManager.writerExpirationMs());
-        // TODO, Here, we use 0 as the logStartOffset passed into 
rebuildWriterState. The reason is
-        // that the current implementation of logStartOffset in Fluss is not 
yet fully refined, and
-        // there may be cases where logStartOffset is not updated. As a 
result, logStartOffset is
-        // not yet reliable. Once the issue with correctly updating 
logStartOffset is resolved in
-        // issue https://github.com/apache/fluss/issues/744, we can use 
logStartOffset here.
-        // Additionally, using 0 versus using logStartOffset does not affect 
correctness—they both
-        // can restore the complete WriterState. The only difference is that 
using logStartOffset
-        // can potentially skip over more segments.
-        LogTablet.rebuildWriterState(
-                writerStateManager, logSegments, 0, segment.getBaseOffset(), 
false);
-        int bytesTruncated = segment.recover();
-        // once we have recovered the segment's data, take a snapshot to 
ensure that we won't
-        // need to reload the same segment again while recovering another 
segment.
-        writerStateManager.takeSnapshot();
-        return bytesTruncated;
-    }
-
     /**
      * Recover the log segments (if there was an unclean shutdown). Ensures 
there is at least one
      * active segment, and returns the updated recovery point and next offset 
after recovery.
@@ -166,106 +129,14 @@ final class LogLoader {
      *     overflow
      */
     private Tuple2<Long, Long> recoverLog() throws IOException {
-        if (!isCleanShutdown) {
-            List<LogSegment> unflushed =
-                    logSegments.values(recoveryPointCheckpoint, 
Long.MAX_VALUE);
-            int numUnflushed = unflushed.size();
-            Iterator<LogSegment> unflushedIter = unflushed.iterator();
-            boolean truncated = false;
-            int numFlushed = 1;
-
-            while (unflushedIter.hasNext() && !truncated) {
-                LogSegment segment = unflushedIter.next();
-                LOG.info(
-                        "Recovering unflushed segment {}. {}/{} recovered for 
bucket {}",
-                        segment.getBaseOffset(),
-                        numFlushed,
-                        numUnflushed,
-                        logSegments.getTableBucket());
-
-                int truncatedBytes = -1;
-                try {
-                    truncatedBytes = recoverSegment(segment);
-                } catch (Exception e) {
-                    if (e instanceof InvalidOffsetException) {
-                        long startOffset = segment.getBaseOffset();
-                        LOG.warn(
-                                "Found invalid offset during recovery for 
bucket {}. Deleting the corrupt segment "
-                                        + "and creating an empty one with 
starting offset {}",
-                                logSegments.getTableBucket(),
-                                startOffset);
-                        truncatedBytes = segment.truncateTo(startOffset);
-                    } else {
-                        throw e;
-                    }
-                }
-
-                if (truncatedBytes > 0) {
-                    // we had an invalid message, delete all remaining log
-                    LOG.warn(
-                            "Corruption found in segment {} for bucket {}, 
truncating to offset {}",
-                            segment.getBaseOffset(),
-                            logSegments.getTableBucket(),
-                            segment.readNextOffset());
-                    removeAndDeleteSegments(unflushedIter);
-                    truncated = true;
-                } else {
-                    numFlushed += 1;
-                }
-            }
-        }
-
+        // TODO truncate log to recover maybe unflush segments.
         if (logSegments.isEmpty()) {
-            // TODO: use logStartOffset if issue 
https://github.com/apache/fluss/issues/744 ready
             logSegments.add(LogSegment.open(logTabletDir, 0L, conf, 
logFormat));
         }
         long logEndOffset = logSegments.lastSegment().get().readNextOffset();
         return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
     }
 
-    /**
-     * This method deletes the given log segments and the associated writer 
snapshots.
-     *
-     * <p>This method does not need to convert IOException to {@link 
LogStorageException} because it
-     * is either called before all logs are loaded or the immediate caller 
will catch and handle
-     * IOException
-     *
-     * @param segmentsToDelete The log segments to schedule for deletion
-     */
-    private void removeAndDeleteSegments(Iterator<LogSegment> 
segmentsToDelete) {
-        if (segmentsToDelete.hasNext()) {
-            List<LogSegment> toDelete = new ArrayList<>();
-            segmentsToDelete.forEachRemaining(toDelete::add);
-
-            LOG.info(
-                    "Deleting segments for bucket {} as part of log recovery: 
{}",
-                    logSegments.getTableBucket(),
-                    
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
-            toDelete.forEach(segment -> 
logSegments.remove(segment.getBaseOffset()));
-
-            try {
-                LocalLog.deleteSegmentFiles(
-                        toDelete, 
LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
-            } catch (IOException e) {
-                LOG.error(
-                        "Failed to delete truncated segments {} for bucket {}",
-                        toDelete,
-                        logSegments.getTableBucket(),
-                        e);
-            }
-
-            try {
-                LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
-            } catch (IOException e) {
-                LOG.error(
-                        "Failed to delete truncated writer snapshots {} for 
bucket {}",
-                        toDelete,
-                        logSegments.getTableBucket(),
-                        e);
-            }
-        }
-    }
-
     /** Loads segments from disk into the provided segments. */
     private void loadSegmentFiles() throws IOException {
         File[] sortedFiles = logTabletDir.listFiles();
@@ -285,28 +156,8 @@ final class LogLoader {
                         }
                     } else if (LocalLog.isLogFile(file)) {
                         long baseOffset = FlussPaths.offsetFromFile(file);
-                        boolean timeIndexFileNewlyCreated =
-                                !FlussPaths.timeIndexFile(logTabletDir, 
baseOffset).exists();
                         LogSegment segment =
                                 LogSegment.open(logTabletDir, baseOffset, 
conf, true, 0, logFormat);
-
-                        try {
-                            segment.sanityCheck(timeIndexFileNewlyCreated);
-                        } catch (IOException e) {
-                            if (e instanceof NoSuchFileException) {
-                                if (isCleanShutdown
-                                        || segment.getBaseOffset() < 
recoveryPointCheckpoint) {
-                                    LOG.error(
-                                            "Could not find offset index file 
corresponding to log file {} "
-                                                    + "for bucket {}, 
recovering segment and rebuilding index files...",
-                                            logSegments.getTableBucket(),
-                                            
segment.getFileLogRecords().file().getAbsoluteFile());
-                                }
-                                recoverSegment(segment);
-                            } else {
-                                throw e;
-                            }
-                        }
                         logSegments.add(segment);
                     }
                 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 93bdbdb65..0a30d5f5f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -44,7 +44,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.NoSuchFileException;
 import java.util.Optional;
 
 import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
@@ -173,23 +172,6 @@ public final class LogSegment {
         timeIndex().resize(size);
     }
 
-    public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
IOException {
-        if (lazyOffsetIndex.file().exists()) {
-            // Resize the time index file to 0 if it is newly created.
-            if (timeIndexFileNewlyCreated) {
-                timeIndex().resize(0);
-            }
-            // Sanity checks for time index and offset index are skipped 
because
-            // we will recover the segments above the recovery point in 
recoverLog()
-            // in any case so sanity checking them here is redundant.
-        } else {
-            throw new NoSuchFileException(
-                    "Offset index file "
-                            + lazyOffsetIndex.file().getAbsolutePath()
-                            + " does not exist.");
-        }
-    }
-
     /**
      * The maximum timestamp we see so far.
      *
@@ -302,7 +284,7 @@ public final class LogSegment {
      * Run recovery on the given segment. This will rebuild the index from the 
log file and lop off
      * any invalid bytes from the end of the log and index.
      */
-    public int recover() throws IOException {
+    public int recover() throws Exception {
         offsetIndex().reset();
         timeIndex().reset();
         int validBytes = 0;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8a5c54cbd..bf75410e4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1283,7 +1283,7 @@ public final class LogTablet {
         loadedWriters.values().forEach(writerStateManager::update);
     }
 
-    public static void deleteWriterSnapshots(
+    private static void deleteWriterSnapshots(
             List<LogSegment> segments, WriterStateManager writerStateManager) 
throws IOException {
         for (LogSegment segment : segments) {
             
writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset());
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index 30afb849a..4fa0baced 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -99,10 +99,6 @@ public class WriterStateManager {
         this.snapshots = loadSnapshots();
     }
 
-    public int writerExpirationMs() {
-        return writerExpirationMs;
-    }
-
     public int writerIdCount() {
         return writerIdCount;
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
deleted file mode 100644
index 7cec74533..000000000
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.log;
-
-import org.apache.fluss.compression.ArrowCompressionInfo;
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.MemorySize;
-import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.LogTestBase;
-import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.server.exception.CorruptIndexException;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
-import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.ManualClock;
-import org.apache.fluss.utils.clock.SystemClock;
-import org.apache.fluss.utils.concurrent.FlussScheduler;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
-import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link LogLoader}. */
-final class LogLoaderTest extends LogTestBase {
-
-    private @TempDir File tempDir;
-    private FlussScheduler scheduler;
-    private File logDir;
-    private Clock clock;
-
-    @BeforeEach
-    public void setup() throws Exception {
-        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10kb"));
-        conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, 
MemorySize.parse("1b"));
-
-        logDir =
-                LogTestUtils.makeRandomLogTabletDir(
-                        tempDir,
-                        DATA1_TABLE_PATH.getDatabaseName(),
-                        DATA1_TABLE_ID,
-                        DATA1_TABLE_PATH.getTableName());
-
-        scheduler = new FlussScheduler(1);
-        scheduler.startup();
-
-        clock = new ManualClock();
-    }
-
-    // TODO: add more tests like Kafka LogLoaderTest
-
-    @Test
-    void testCorruptIndexRebuild() throws Exception {
-        // publish the records and close the log
-        int numRecords = 200;
-        LogTablet logTablet = createLogTablet(true);
-        appendRecords(logTablet, numRecords);
-        // collect all the index files
-        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
-        logTablet.close();
-
-        // corrupt all the index files
-        for (File indexFile : indexFiles) {
-            try (FileChannel fileChannel =
-                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
-                for (int i = 0; i < 12; i++) {
-                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
-                }
-            }
-        }
-
-        // test reopen the log without recovery, sanity check of index files 
should throw exception
-        logTablet = createLogTablet(true);
-        for (LogSegment segment : logTablet.logSegments()) {
-            if (segment.getBaseOffset() != 
logTablet.activeLogSegment().getBaseOffset()) {
-                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
-                        .isInstanceOf(CorruptIndexException.class)
-                        .hasMessage(
-                                String.format(
-                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
-                                        
segment.offsetIndex().file().getAbsolutePath(),
-                                        segment.offsetIndex().length(),
-                                        segment.offsetIndex().entrySize()));
-                assertThatThrownBy(segment.timeIndex()::sanityCheck)
-                        .isInstanceOf(CorruptIndexException.class)
-                        .hasMessageContaining(
-                                String.format(
-                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
-                                        
segment.timeIndex().file().getAbsolutePath()));
-            } else {
-                // the offset index file of active segment will be resized, 
which case no corruption
-                // exception when doing sanity check
-                segment.offsetIndex().sanityCheck();
-                assertThatThrownBy(segment.timeIndex()::sanityCheck)
-                        .isInstanceOf(CorruptIndexException.class)
-                        .hasMessageContaining(
-                                String.format(
-                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
-                                        
segment.timeIndex().file().getAbsolutePath()));
-            }
-        }
-        logTablet.close();
-
-        // test reopen the log with recovery, sanity check of index files 
should no corruption
-        logTablet = createLogTablet(false);
-        for (LogSegment segment : logTablet.logSegments()) {
-            segment.offsetIndex().sanityCheck();
-            segment.timeIndex().sanityCheck();
-        }
-        assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
-        for (int i = 0; i < numRecords; i++) {
-            assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() 
+ i * 10))
-                    .isEqualTo(i);
-        }
-        logTablet.close();
-    }
-
-    @Test
-    void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
-        // publish the records and close the log
-        int numRecords = 200;
-        LogTablet logTablet = createLogTablet(true);
-        appendRecords(logTablet, numRecords);
-        // collect all the index files
-        long recoveryPoint = logTablet.localLogEndOffset() / 2;
-        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
-        logTablet.close();
-
-        // corrupt all the index files
-        for (File indexFile : indexFiles) {
-            try (FileChannel fileChannel =
-                    FileChannel.open(indexFile.toPath(), 
StandardOpenOption.APPEND)) {
-                for (int i = 0; i < 12; i++) {
-                    fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
-                }
-            }
-        }
-
-        // test reopen the log with recovery point
-        logTablet = createLogTablet(false, recoveryPoint);
-        List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint, 
Long.MAX_VALUE);
-        assertThat(logSegments.size() < 
logTablet.logSegments().size()).isTrue();
-        Set<Long> recoveredSegments =
-                
logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet());
-        for (LogSegment segment : logTablet.logSegments()) {
-            if (recoveredSegments.contains(segment.getBaseOffset())) {
-                segment.offsetIndex().sanityCheck();
-                segment.timeIndex().sanityCheck();
-            } else {
-                // the segments before recovery point will not be recovered, 
so sanity check should
-                // still throw corrupt exception
-                assertThatThrownBy(segment.offsetIndex()::sanityCheck)
-                        .isInstanceOf(CorruptIndexException.class)
-                        .hasMessage(
-                                String.format(
-                                        "Index file %s is corrupt, found %d 
bytes which is neither positive nor a multiple of %d",
-                                        
segment.offsetIndex().file().getAbsolutePath(),
-                                        segment.offsetIndex().length(),
-                                        segment.offsetIndex().entrySize()));
-                assertThatThrownBy(segment.timeIndex()::sanityCheck)
-                        .isInstanceOf(CorruptIndexException.class)
-                        .hasMessageContaining(
-                                String.format(
-                                        "Corrupt time index found, time index 
file (%s) has non-zero size but the last timestamp is 0 which is less than the 
first timestamp",
-                                        
segment.timeIndex().file().getAbsolutePath()));
-            }
-        }
-    }
-
-    @Test
-    void testIndexRebuild() throws Exception {
-        // publish the records and close the log
-        int numRecords = 200;
-        LogTablet logTablet = createLogTablet(true);
-        appendRecords(logTablet, numRecords);
-        // collect all index files
-        List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
-        logTablet.close();
-
-        // delete all the index files
-        indexFiles.forEach(File::delete);
-
-        // reopen the log
-        logTablet = createLogTablet(false);
-        assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords);
-        // the index files should be rebuilt
-        assertThat(logTablet.logSegments().get(0).offsetIndex().entries() > 
0).isTrue();
-        assertThat(logTablet.logSegments().get(0).timeIndex().entries() > 
0).isTrue();
-        for (int i = 0; i < numRecords; i++) {
-            assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() 
+ i * 10))
-                    .isEqualTo(i);
-        }
-        logTablet.close();
-    }
-
-    @Test
-    void testInvalidOffsetRebuild() throws Exception {
-        // publish the records and close the log
-        int numRecords = 200;
-        LogTablet logTablet = createLogTablet(true);
-        appendRecords(logTablet, numRecords);
-
-        List<LogSegment> logSegments = logTablet.logSegments();
-        int corruptSegmentIndex = logSegments.size() / 2;
-        assertThat(corruptSegmentIndex < logSegments.size()).isTrue();
-        LogSegment corruptSegment = logSegments.get(corruptSegmentIndex);
-
-        // append an invalid offset batch
-        List<Object[]> objects = Collections.singletonList(new Object[] {1, 
"a"});
-        List<ChangeType> changeTypes =
-                objects.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
-        MemoryLogRecords memoryLogRecords =
-                createBasicMemoryLogRecords(
-                        DATA1_ROW_TYPE,
-                        DEFAULT_SCHEMA_ID,
-                        corruptSegment.getBaseOffset(),
-                        clock.milliseconds(),
-                        magic,
-                        System.currentTimeMillis(),
-                        0,
-                        changeTypes,
-                        objects,
-                        LogFormat.ARROW,
-                        ArrowCompressionInfo.DEFAULT_COMPRESSION);
-        corruptSegment.getFileLogRecords().append(memoryLogRecords);
-        logTablet.close();
-
-        logTablet = createLogTablet(false);
-        // the corrupt segment should be truncated to base offset
-        
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
-        // segments after the corrupt segment should be removed
-        
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
-    }
-
-    private LogTablet createLogTablet(boolean isCleanShutdown) throws 
Exception {
-        return createLogTablet(isCleanShutdown, 0);
-    }
-
-    private LogTablet createLogTablet(boolean isCleanShutdown, long 
recoveryPoint)
-            throws Exception {
-        return LogTablet.create(
-                PhysicalTablePath.of(DATA1_TABLE_PATH),
-                logDir,
-                conf,
-                TestingMetricGroups.TABLET_SERVER_METRICS,
-                recoveryPoint,
-                scheduler,
-                LogFormat.ARROW,
-                1,
-                false,
-                SystemClock.getInstance(),
-                isCleanShutdown);
-    }
-
-    private void appendRecords(LogTablet logTablet, int numRecords) throws 
Exception {
-        int baseOffset = 0;
-        int batchSequence = 0;
-        for (int i = 0; i < numRecords; i++) {
-            List<Object[]> objects = Collections.singletonList(new Object[] 
{1, "a"});
-            List<ChangeType> changeTypes =
-                    objects.stream()
-                            .map(row -> ChangeType.APPEND_ONLY)
-                            .collect(Collectors.toList());
-            MemoryLogRecords memoryLogRecords =
-                    createBasicMemoryLogRecords(
-                            DATA1_ROW_TYPE,
-                            DEFAULT_SCHEMA_ID,
-                            baseOffset,
-                            clock.milliseconds() + i * 10L,
-                            magic,
-                            System.currentTimeMillis(),
-                            batchSequence,
-                            changeTypes,
-                            objects,
-                            LogFormat.ARROW,
-                            ArrowCompressionInfo.DEFAULT_COMPRESSION);
-            logTablet.appendAsFollower(memoryLogRecords);
-            baseOffset++;
-            batchSequence++;
-        }
-    }
-
-    private List<File> collectIndexFiles(List<LogSegment> logSegments) throws 
IOException {
-        List<File> indexFiles = new ArrayList<>();
-        for (LogSegment segment : logSegments) {
-            indexFiles.add(segment.offsetIndex().file());
-            indexFiles.add(segment.timeIndex().file());
-        }
-        return indexFiles;
-    }
-}

Reply via email to