This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d5cb52147 [server] Recover log and index file for unclean shutdown
(#1749)
d5cb52147 is described below
commit d5cb521471eeac7a07120ecde1a199c61c62dd02
Author: Liebing <[email protected]>
AuthorDate: Mon Nov 10 20:01:09 2025 +0800
[server] Recover log and index file for unclean shutdown (#1749)
---
.../org/apache/fluss/server/log/LogLoader.java | 151 +++++++++-
.../org/apache/fluss/server/log/LogSegment.java | 20 +-
.../org/apache/fluss/server/log/LogTablet.java | 2 +-
.../fluss/server/log/WriterStateManager.java | 4 +
.../org/apache/fluss/server/log/LogLoaderTest.java | 325 +++++++++++++++++++++
5 files changed, 499 insertions(+), 3 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index 62d38581c..95f41c021 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.log;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidOffsetException;
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.metadata.LogFormat;
@@ -31,8 +32,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -117,6 +123,37 @@ final class LogLoader {
nextOffset, activeSegment.getBaseOffset(),
activeSegment.getSizeInBytes()));
}
+ /**
+ * Just recovers the given segment, without adding it to the provided
segments.
+ *
+ * @param segment Segment to recover
+ * @return The number of bytes truncated from the segment
+ * @throws LogSegmentOffsetOverflowException if the segment contains
messages that cause index
+ * offset overflow
+ */
+ private int recoverSegment(LogSegment segment) throws IOException {
+ WriterStateManager writerStateManager =
+ new WriterStateManager(
+ logSegments.getTableBucket(),
+ logTabletDir,
+ this.writerStateManager.writerExpirationMs());
+ // TODO, Here, we use 0 as the logStartOffset passed into
rebuildWriterState. The reason is
+ // that the current implementation of logStartOffset in Fluss is not
yet fully refined, and
+ // there may be cases where logStartOffset is not updated. As a
result, logStartOffset is
+ // not yet reliable. Once the issue with correctly updating
logStartOffset is resolved in
+ // issue https://github.com/apache/fluss/issues/744, we can use
logStartOffset here.
+ // Additionally, using 0 versus using logStartOffset does not affect
correctness—they both
+ // can restore the complete WriterState. The only difference is that
using logStartOffset
+ // can potentially skip over more segments.
+ LogTablet.rebuildWriterState(
+ writerStateManager, logSegments, 0, segment.getBaseOffset(),
false);
+ int bytesTruncated = segment.recover();
+ // once we have recovered the segment's data, take a snapshot to
ensure that we won't
+ // need to reload the same segment again while recovering another
segment.
+ writerStateManager.takeSnapshot();
+ return bytesTruncated;
+ }
+
/**
* Recover the log segments (if there was an unclean shutdown). Ensures
there is at least one
* active segment, and returns the updated recovery point and next offset
after recovery.
@@ -129,14 +166,106 @@ final class LogLoader {
* overflow
*/
private Tuple2<Long, Long> recoverLog() throws IOException {
- // TODO truncate log to recover maybe unflush segments.
+ if (!isCleanShutdown) {
+ List<LogSegment> unflushed =
+ logSegments.values(recoveryPointCheckpoint,
Long.MAX_VALUE);
+ int numUnflushed = unflushed.size();
+ Iterator<LogSegment> unflushedIter = unflushed.iterator();
+ boolean truncated = false;
+ int numFlushed = 1;
+
+ while (unflushedIter.hasNext() && !truncated) {
+ LogSegment segment = unflushedIter.next();
+ LOG.info(
+ "Recovering unflushed segment {}. {}/{} recovered for
bucket {}",
+ segment.getBaseOffset(),
+ numFlushed,
+ numUnflushed,
+ logSegments.getTableBucket());
+
+ int truncatedBytes = -1;
+ try {
+ truncatedBytes = recoverSegment(segment);
+ } catch (Exception e) {
+ if (e instanceof InvalidOffsetException) {
+ long startOffset = segment.getBaseOffset();
+ LOG.warn(
+ "Found invalid offset during recovery for
bucket {}. Deleting the corrupt segment "
+ + "and creating an empty one with
starting offset {}",
+ logSegments.getTableBucket(),
+ startOffset);
+ truncatedBytes = segment.truncateTo(startOffset);
+ } else {
+ throw e;
+ }
+ }
+
+ if (truncatedBytes > 0) {
+ // we had an invalid message, delete all remaining log
+ LOG.warn(
+ "Corruption found in segment {} for bucket {},
truncating to offset {}",
+ segment.getBaseOffset(),
+ logSegments.getTableBucket(),
+ segment.readNextOffset());
+ removeAndDeleteSegments(unflushedIter);
+ truncated = true;
+ } else {
+ numFlushed += 1;
+ }
+ }
+ }
+
if (logSegments.isEmpty()) {
+ // TODO: use logStartOffset if issue
https://github.com/apache/fluss/issues/744 ready
logSegments.add(LogSegment.open(logTabletDir, 0L, conf,
logFormat));
}
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
}
+ /**
+ * This method deletes the given log segments and the associated writer
snapshots.
+ *
+ * <p>This method does not need to convert IOException to {@link
LogStorageException} because it
+ * is either called before all logs are loaded or the immediate caller
will catch and handle
+ * IOException
+ *
+ * @param segmentsToDelete The log segments to schedule for deletion
+ */
+ private void removeAndDeleteSegments(Iterator<LogSegment>
segmentsToDelete) {
+ if (segmentsToDelete.hasNext()) {
+ List<LogSegment> toDelete = new ArrayList<>();
+ segmentsToDelete.forEachRemaining(toDelete::add);
+
+ LOG.info(
+ "Deleting segments for bucket {} as part of log recovery:
{}",
+ logSegments.getTableBucket(),
+
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
+ toDelete.forEach(segment ->
logSegments.remove(segment.getBaseOffset()));
+
+ try {
+ LocalLog.deleteSegmentFiles(
+ toDelete,
LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to delete truncated segments {} for bucket {}",
+ toDelete,
+ logSegments.getTableBucket(),
+ e);
+ }
+
+ try {
+ LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to delete truncated writer snapshots {} for
bucket {}",
+ toDelete,
+ logSegments.getTableBucket(),
+ e);
+ }
+ }
+ }
+
/** Loads segments from disk into the provided segments. */
private void loadSegmentFiles() throws IOException {
File[] sortedFiles = logTabletDir.listFiles();
@@ -156,8 +285,28 @@ final class LogLoader {
}
} else if (LocalLog.isLogFile(file)) {
long baseOffset = FlussPaths.offsetFromFile(file);
+ boolean timeIndexFileNewlyCreated =
+ !FlussPaths.timeIndexFile(logTabletDir,
baseOffset).exists();
LogSegment segment =
LogSegment.open(logTabletDir, baseOffset,
conf, true, 0, logFormat);
+
+ try {
+ segment.sanityCheck(timeIndexFileNewlyCreated);
+ } catch (IOException e) {
+ if (e instanceof NoSuchFileException) {
+ if (isCleanShutdown
+ || segment.getBaseOffset() <
recoveryPointCheckpoint) {
+ LOG.error(
+ "Could not find offset index file
corresponding to log file {} "
+ + "for bucket {},
recovering segment and rebuilding index files...",
+ logSegments.getTableBucket(),
+
segment.getFileLogRecords().file().getAbsoluteFile());
+ }
+ recoverSegment(segment);
+ } else {
+ throw e;
+ }
+ }
logSegments.add(segment);
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 0a30d5f5f..93bdbdb65 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -44,6 +44,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.File;
import java.io.IOException;
+import java.nio.file.NoSuchFileException;
import java.util.Optional;
import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
@@ -172,6 +173,23 @@ public final class LogSegment {
timeIndex().resize(size);
}
+ public void sanityCheck(boolean timeIndexFileNewlyCreated) throws
IOException {
+ if (lazyOffsetIndex.file().exists()) {
+ // Resize the time index file to 0 if it is newly created.
+ if (timeIndexFileNewlyCreated) {
+ timeIndex().resize(0);
+ }
+ // Sanity checks for time index and offset index are skipped
because
+ // we will recover the segments above the recovery point in
recoverLog()
+ // in any case so sanity checking them here is redundant.
+ } else {
+ throw new NoSuchFileException(
+ "Offset index file "
+ + lazyOffsetIndex.file().getAbsolutePath()
+ + " does not exist.");
+ }
+ }
+
/**
* The maximum timestamp we see so far.
*
@@ -284,7 +302,7 @@ public final class LogSegment {
* Run recovery on the given segment. This will rebuild the index from the
log file and lop off
* any invalid bytes from the end of the log and index.
*/
- public int recover() throws Exception {
+ public int recover() throws IOException {
offsetIndex().reset();
timeIndex().reset();
int validBytes = 0;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index bf75410e4..8a5c54cbd 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1283,7 +1283,7 @@ public final class LogTablet {
loadedWriters.values().forEach(writerStateManager::update);
}
- private static void deleteWriterSnapshots(
+ public static void deleteWriterSnapshots(
List<LogSegment> segments, WriterStateManager writerStateManager)
throws IOException {
for (LogSegment segment : segments) {
writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset());
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index 4fa0baced..30afb849a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -99,6 +99,10 @@ public class WriterStateManager {
this.snapshots = loadSnapshots();
}
+ public int writerExpirationMs() {
+ return writerExpirationMs;
+ }
+
public int writerIdCount() {
return writerIdCount;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
new file mode 100644
index 000000000..7cec74533
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogTestBase;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.exception.CorruptIndexException;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.ManualClock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogLoader}. */
+final class LogLoaderTest extends LogTestBase {
+
+ private @TempDir File tempDir;
+ private FlussScheduler scheduler;
+ private File logDir;
+ private Clock clock;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE,
MemorySize.parse("10kb"));
+ conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE,
MemorySize.parse("1b"));
+
+ logDir =
+ LogTestUtils.makeRandomLogTabletDir(
+ tempDir,
+ DATA1_TABLE_PATH.getDatabaseName(),
+ DATA1_TABLE_ID,
+ DATA1_TABLE_PATH.getTableName());
+
+ scheduler = new FlussScheduler(1);
+ scheduler.startup();
+
+ clock = new ManualClock();
+ }
+
+ // TODO: add more tests like Kafka LogLoaderTest
+
+ @Test
+ void testCorruptIndexRebuild() throws Exception {
+ // publish the records and close the log
+ int numRecords = 200;
+ LogTablet logTablet = createLogTablet(true);
+ appendRecords(logTablet, numRecords);
+ // collect all the index files
+ List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+ logTablet.close();
+
+ // corrupt all the index files
+ for (File indexFile : indexFiles) {
+ try (FileChannel fileChannel =
+ FileChannel.open(indexFile.toPath(),
StandardOpenOption.APPEND)) {
+ for (int i = 0; i < 12; i++) {
+ fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+ }
+ }
+ }
+
+ // test reopen the log without recovery, sanity check of index files
should throw exception
+ logTablet = createLogTablet(true);
+ for (LogSegment segment : logTablet.logSegments()) {
+ if (segment.getBaseOffset() !=
logTablet.activeLogSegment().getBaseOffset()) {
+ assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+ .isInstanceOf(CorruptIndexException.class)
+ .hasMessage(
+ String.format(
+ "Index file %s is corrupt, found %d
bytes which is neither positive nor a multiple of %d",
+
segment.offsetIndex().file().getAbsolutePath(),
+ segment.offsetIndex().length(),
+ segment.offsetIndex().entrySize()));
+ assertThatThrownBy(segment.timeIndex()::sanityCheck)
+ .isInstanceOf(CorruptIndexException.class)
+ .hasMessageContaining(
+ String.format(
+ "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
+
segment.timeIndex().file().getAbsolutePath()));
+ } else {
+ // the offset index file of active segment will be resized,
which case no corruption
+ // exception when doing sanity check
+ segment.offsetIndex().sanityCheck();
+ assertThatThrownBy(segment.timeIndex()::sanityCheck)
+ .isInstanceOf(CorruptIndexException.class)
+ .hasMessageContaining(
+ String.format(
+ "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
+
segment.timeIndex().file().getAbsolutePath()));
+ }
+ }
+ logTablet.close();
+
+ // test reopen the log with recovery, sanity check of index files
should no corruption
+ logTablet = createLogTablet(false);
+ for (LogSegment segment : logTablet.logSegments()) {
+ segment.offsetIndex().sanityCheck();
+ segment.timeIndex().sanityCheck();
+ }
+ assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset());
+ for (int i = 0; i < numRecords; i++) {
+ assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds()
+ i * 10))
+ .isEqualTo(i);
+ }
+ logTablet.close();
+ }
+
+ @Test
+ void testCorruptIndexRebuildWithRecoveryPoint() throws Exception {
+ // publish the records and close the log
+ int numRecords = 200;
+ LogTablet logTablet = createLogTablet(true);
+ appendRecords(logTablet, numRecords);
+ // collect all the index files
+ long recoveryPoint = logTablet.localLogEndOffset() / 2;
+ List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+ logTablet.close();
+
+ // corrupt all the index files
+ for (File indexFile : indexFiles) {
+ try (FileChannel fileChannel =
+ FileChannel.open(indexFile.toPath(),
StandardOpenOption.APPEND)) {
+ for (int i = 0; i < 12; i++) {
+ fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
+ }
+ }
+ }
+
+ // test reopen the log with recovery point
+ logTablet = createLogTablet(false, recoveryPoint);
+ List<LogSegment> logSegments = logTablet.logSegments(recoveryPoint,
Long.MAX_VALUE);
+ assertThat(logSegments.size() <
logTablet.logSegments().size()).isTrue();
+ Set<Long> recoveredSegments =
+
logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet());
+ for (LogSegment segment : logTablet.logSegments()) {
+ if (recoveredSegments.contains(segment.getBaseOffset())) {
+ segment.offsetIndex().sanityCheck();
+ segment.timeIndex().sanityCheck();
+ } else {
+ // the segments before recovery point will not be recovered,
so sanity check should
+ // still throw corrupt exception
+ assertThatThrownBy(segment.offsetIndex()::sanityCheck)
+ .isInstanceOf(CorruptIndexException.class)
+ .hasMessage(
+ String.format(
+ "Index file %s is corrupt, found %d
bytes which is neither positive nor a multiple of %d",
+
segment.offsetIndex().file().getAbsolutePath(),
+ segment.offsetIndex().length(),
+ segment.offsetIndex().entrySize()));
+ assertThatThrownBy(segment.timeIndex()::sanityCheck)
+ .isInstanceOf(CorruptIndexException.class)
+ .hasMessageContaining(
+ String.format(
+ "Corrupt time index found, time index
file (%s) has non-zero size but the last timestamp is 0 which is less than the
first timestamp",
+
segment.timeIndex().file().getAbsolutePath()));
+ }
+ }
+ }
+
+ @Test
+ void testIndexRebuild() throws Exception {
+ // publish the records and close the log
+ int numRecords = 200;
+ LogTablet logTablet = createLogTablet(true);
+ appendRecords(logTablet, numRecords);
+ // collect all index files
+ List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
+ logTablet.close();
+
+ // delete all the index files
+ indexFiles.forEach(File::delete);
+
+ // reopen the log
+ logTablet = createLogTablet(false);
+ assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords);
+ // the index files should be rebuilt
+ assertThat(logTablet.logSegments().get(0).offsetIndex().entries() >
0).isTrue();
+ assertThat(logTablet.logSegments().get(0).timeIndex().entries() >
0).isTrue();
+ for (int i = 0; i < numRecords; i++) {
+ assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds()
+ i * 10))
+ .isEqualTo(i);
+ }
+ logTablet.close();
+ }
+
+ @Test
+ void testInvalidOffsetRebuild() throws Exception {
+ // publish the records and close the log
+ int numRecords = 200;
+ LogTablet logTablet = createLogTablet(true);
+ appendRecords(logTablet, numRecords);
+
+ List<LogSegment> logSegments = logTablet.logSegments();
+ int corruptSegmentIndex = logSegments.size() / 2;
+ assertThat(corruptSegmentIndex < logSegments.size()).isTrue();
+ LogSegment corruptSegment = logSegments.get(corruptSegmentIndex);
+
+ // append an invalid offset batch
+ List<Object[]> objects = Collections.singletonList(new Object[] {1,
"a"});
+ List<ChangeType> changeTypes =
+ objects.stream().map(row ->
ChangeType.APPEND_ONLY).collect(Collectors.toList());
+ MemoryLogRecords memoryLogRecords =
+ createBasicMemoryLogRecords(
+ DATA1_ROW_TYPE,
+ DEFAULT_SCHEMA_ID,
+ corruptSegment.getBaseOffset(),
+ clock.milliseconds(),
+ magic,
+ System.currentTimeMillis(),
+ 0,
+ changeTypes,
+ objects,
+ LogFormat.ARROW,
+ ArrowCompressionInfo.DEFAULT_COMPRESSION);
+ corruptSegment.getFileLogRecords().append(memoryLogRecords);
+ logTablet.close();
+
+ logTablet = createLogTablet(false);
+ // the corrupt segment should be truncated to base offset
+
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
+ // segments after the corrupt segment should be removed
+
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
+ }
+
+ private LogTablet createLogTablet(boolean isCleanShutdown) throws
Exception {
+ return createLogTablet(isCleanShutdown, 0);
+ }
+
+ private LogTablet createLogTablet(boolean isCleanShutdown, long
recoveryPoint)
+ throws Exception {
+ return LogTablet.create(
+ PhysicalTablePath.of(DATA1_TABLE_PATH),
+ logDir,
+ conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ recoveryPoint,
+ scheduler,
+ LogFormat.ARROW,
+ 1,
+ false,
+ SystemClock.getInstance(),
+ isCleanShutdown);
+ }
+
+ private void appendRecords(LogTablet logTablet, int numRecords) throws
Exception {
+ int baseOffset = 0;
+ int batchSequence = 0;
+ for (int i = 0; i < numRecords; i++) {
+ List<Object[]> objects = Collections.singletonList(new Object[]
{1, "a"});
+ List<ChangeType> changeTypes =
+ objects.stream()
+ .map(row -> ChangeType.APPEND_ONLY)
+ .collect(Collectors.toList());
+ MemoryLogRecords memoryLogRecords =
+ createBasicMemoryLogRecords(
+ DATA1_ROW_TYPE,
+ DEFAULT_SCHEMA_ID,
+ baseOffset,
+ clock.milliseconds() + i * 10L,
+ magic,
+ System.currentTimeMillis(),
+ batchSequence,
+ changeTypes,
+ objects,
+ LogFormat.ARROW,
+ ArrowCompressionInfo.DEFAULT_COMPRESSION);
+ logTablet.appendAsFollower(memoryLogRecords);
+ baseOffset++;
+ batchSequence++;
+ }
+ }
+
+ private List<File> collectIndexFiles(List<LogSegment> logSegments) throws
IOException {
+ List<File> indexFiles = new ArrayList<>();
+ for (LogSegment segment : logSegments) {
+ indexFiles.add(segment.offsetIndex().file());
+ indexFiles.add(segment.timeIndex().file());
+ }
+ return indexFiles;
+ }
+}