This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6a1961b0f36 [To dev/1.3] Fix that WALBuffer waits for flush instead of
file-roll (#17628) (#17633)
6a1961b0f36 is described below
commit 6a1961b0f3625b855231e0be18c47a32e60bcc51
Author: Jiang Tian <[email protected]>
AuthorDate: Tue May 12 09:38:46 2026 +0800
[To dev/1.3] Fix that WALBuffer waits for flush instead of file-roll
(#17628) (#17633)
* Fix that WALBuffer waits for flush instead of file-roll (#17628)
(cherry picked from commit cfbcc249c53cf5ec090c19ed69bb59a05d7efb07)
* fix test compilation
---
.../dataregion/wal/buffer/IWALBuffer.java | 6 +-
.../dataregion/wal/buffer/WALBuffer.java | 18 +-
.../storageengine/dataregion/wal/node/WALNode.java | 10 +-
.../wal/node/WALNodeWaitForRollFileTest.java | 406 +++++++++++++++++++++
4 files changed, 427 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java
index 978ea781162..082dcca8c05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java
@@ -51,7 +51,7 @@ public interface IWALBuffer extends AutoCloseable {
*
* @throws InterruptedException when interrupted by the flush thread
*/
- void waitForFlush() throws InterruptedException;
+ void waitForRollFile() throws InterruptedException;
/**
* Wait for next flush operation done, if the predicate == true after
entering a locked
@@ -60,14 +60,14 @@ public interface IWALBuffer extends AutoCloseable {
* @param waitPredicate the condition which should be satisfied before
waiting.
* @throws InterruptedException when interrupted by the flush thread
*/
- public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws
InterruptedException;
+ public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws
InterruptedException;
/**
* Wait for next flush operation done.
*
* @throws InterruptedException when interrupted by the flush thread
*/
- boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+ boolean waitForRollFile(long time, TimeUnit unit) throws
InterruptedException;
/** Return true when all wal entries all consumed and flushed. */
boolean isAllWALEntriesConsumed();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 36c404faa7c..08d1ad384bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -88,6 +88,7 @@ public class WALBuffer extends AbstractWALBuffer {
private final Lock buffersLock = new ReentrantLock();
// condition to guarantee correctness of switching buffers
private final Condition idleBufferReadyCondition =
buffersLock.newCondition();
+ private final Condition rollLogWriterCondition = buffersLock.newCondition();
// last writer position when fsync is called, help record each entry's
position
private long lastFsyncPosition;
@@ -168,6 +169,13 @@ public class WALBuffer extends AbstractWALBuffer {
protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus)
throws IOException {
File file = super.rollLogWriter(searchIndex, fileStatus);
currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
+ buffersLock.lock();
+ try {
+ // notify WALReader that new file is generated, and it can read new file
+ rollLogWriterCondition.signalAll();
+ } finally {
+ buffersLock.unlock();
+ }
return file;
}
@@ -640,7 +648,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
@Override
- public void waitForFlush() throws InterruptedException {
+ public void waitForRollFile() throws InterruptedException {
buffersLock.lock();
try {
idleBufferReadyCondition.await();
@@ -650,11 +658,11 @@ public class WALBuffer extends AbstractWALBuffer {
}
@Override
- public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws
InterruptedException {
+ public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws
InterruptedException {
buffersLock.lock();
try {
if (waitPredicate.test(this)) {
- idleBufferReadyCondition.await();
+ rollLogWriterCondition.await();
}
} finally {
buffersLock.unlock();
@@ -662,10 +670,10 @@ public class WALBuffer extends AbstractWALBuffer {
}
@Override
- public boolean waitForFlush(long time, TimeUnit unit) throws
InterruptedException {
+ public boolean waitForRollFile(long time, TimeUnit unit) throws
InterruptedException {
buffersLock.lock();
try {
- return idleBufferReadyCondition.await(time, unit);
+ return rollLogWriterCondition.await(time, unit);
} finally {
buffersLock.unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 003c74763da..b0e19364ddb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -92,7 +92,7 @@ public class WALNode implements IWALNode {
// no iot consensus, all insert nodes can be safely deleted
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX =
Long.MAX_VALUE;
// timeout threshold when waiting for next wal entry
- private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
+ public static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
// unique identifier of this WALNode
@@ -733,7 +733,7 @@ public class WALNode implements IWALNode {
while (!hasNext()) {
if (!walFileRolled) {
boolean timeout =
- !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC,
TimeUnit.SECONDS);
+ !buffer.waitForRollFile(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC,
TimeUnit.SECONDS);
if (timeout) {
bufferLastSearchIndex = buffer.getCurrentSearchIndex();
logger.info(
@@ -746,7 +746,7 @@ public class WALNode implements IWALNode {
} else {
// only wait when the search index of the buffer remains the same as
the previous check
long finalBufferLastSearchIndex = bufferLastSearchIndex;
- buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() ==
finalBufferLastSearchIndex);
+ buffer.waitForRollFile(buf -> buf.getCurrentSearchIndex() ==
finalBufferLastSearchIndex);
}
}
}
@@ -755,8 +755,8 @@ public class WALNode implements IWALNode {
public void waitForNextReady(long time, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!hasNext()) {
- boolean timeout = !buffer.waitForFlush(time, unit);
- if (timeout || !hasNext()) {
+ boolean timeout = !buffer.waitForRollFile(time, unit);
+ if (timeout && !hasNext()) {
throw new TimeoutException();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
new file mode 100644
index 00000000000..b75efaefca4
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class WALNodeWaitForRollFileTest {
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+ private static final String logDirectory =
+ TestConstant.BASE_OUTPUT_PATH.concat("wal-roll-file-test");
+ private static final String databasePath = "root.test_sg";
+ private static final String devicePath = databasePath + ".test_d";
+ private static final String dataRegionId = "1";
+ private WALMode prevMode;
+ private String prevConsensus;
+ private WALNode walNode;
+ private long originWALThreshold;
+
+ @Before
+ public void setUp() throws Exception {
+ originWALThreshold = config.getWalFileSizeThresholdInByte();
+ EnvironmentUtils.cleanDir(logDirectory);
+ prevMode = config.getWalMode();
+ prevConsensus = config.getDataRegionConsensusProtocolClass();
+ config.setWalMode(WALMode.SYNC);
+ config.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+ config.setWalFileSizeThresholdInByte(2 * 1024 * 1024);
+ walNode = new WALNode(identifier, logDirectory);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ walNode.close();
+ config.setWalMode(prevMode);
+ config.setDataRegionConsensusProtocolClass(prevConsensus);
+ config.setWalFileSizeThresholdInByte(originWALThreshold);
+ EnvironmentUtils.cleanDir(logDirectory);
+ StorageEngine.getInstance().reset();
+ }
+
+ /**
+ * Verifies that waitForNextReady(time, unit) throws TimeoutException when
no WAL data is
+ * available at the requested search index. This uses waitForRollFile
internally.
+ */
+ @Test
+ public void testWaitForNextReadyTimesOutWhenNoData() throws Exception {
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+ assertFalse(iterator.hasNext());
+ try {
+ iterator.waitForNextReady(1, TimeUnit.SECONDS);
+ fail("Expected TimeoutException");
+ } catch (TimeoutException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Verifies that waitForNextReady(time, unit) does NOT wake up from a buffer
flush alone — it
+ * requires a WAL file roll. This is the core behavioral change: the old
waitForFlush would return
+ * on any buffer sync, but waitForRollFile only returns when a new WAL file
is created.
+ */
+ @Test
+ public void testWaitForNextReadyNotWokenByFlushWithoutRoll() throws
Exception {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ // write a small amount of data (not enough to trigger roll)
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
+ insertTabletNode.setSearchIndex(1);
+ walNode.log(memTable.getMemTableId(), insertTabletNode, 0,
insertTabletNode.getRowCount());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ // data is flushed to buffer but no WAL file roll happened yet, iterator
at search index 1
+ // should not find data (because the current-writing WAL file is not
readable by the iterator)
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+ try {
+ long start = System.currentTimeMillis();
+ iterator.waitForNextReady(2, TimeUnit.SECONDS);
+ if (System.currentTimeMillis() - start
+ < WALNode.WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC * 1000) {
+ fail("The data should not be found before timeout");
+ }
+ } catch (TimeoutException e) {
+ // expected: flush happened but no roll, so waitForRollFile timed out
+ }
+ }
+
+ /**
+ * Verifies that waitForNextReady succeeds after a WAL file roll makes data
readable. The iterator
+ * should wake up when rollLogWriter signals the rollLogWriterCondition.
+ */
+ @Test
+ public void testWaitForNextReadySucceedsAfterRollFile() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ // write data with search index
+ for (int i = 1; i <= 5; i++) {
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {i});
+ insertTabletNode.setSearchIndex(i);
+ walNode.log(memTable.getMemTableId(), insertTabletNode, 0,
insertTabletNode.getRowCount());
+ }
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ // roll the WAL file so the data is in a closed file readable by the
iterator
+ walNode.rollWALFile();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ // iterator at search index 1 should find the data after roll
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+ assertTrue(iterator.hasNext());
+ assertNotNull(iterator.next());
+ }
+
+ /**
+ * Verifies that waitForNextReady wakes up when a WAL file roll is triggered
concurrently. A
+ * background thread rolls the WAL file while the main thread waits on the
iterator.
+ */
+ @Test(timeout = 30000)
+ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ // write data with search index
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
+ insertTabletNode.setSearchIndex(1);
+ walNode.log(memTable.getMemTableId(), insertTabletNode, 0,
insertTabletNode.getRowCount());
+ walNode.log(
+ memTable.getMemTableId(), new
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+ AtomicBoolean found = new AtomicBoolean(false);
+ AtomicReference<Exception> error = new AtomicReference<>();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ // background: wait for data to become available via waitForNextReady
+ Future<?> waitFuture =
+ executor.submit(
+ () -> {
+ try {
+ iterator.waitForNextReady(15, TimeUnit.SECONDS);
+ if (iterator.hasNext()) {
+ found.set(true);
+ }
+ } catch (Exception e) {
+ error.set(e);
+ }
+ });
+
+ // give the waiter thread time to start blocking
+ Thread.sleep(500);
+
+ // trigger WAL file roll — this should signal rollLogWriterCondition and
wake up the iterator
+ walNode.rollWALFile();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ waitFuture.get(20, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ if (error.get() != null) {
+ throw error.get();
+ }
+ assertTrue("Iterator should have found data after WAL file roll",
found.get());
+ }
+
+ /**
+ * Verifies that the no-arg waitForNextReady eventually proceeds when enough
data is written to
+ * trigger an automatic WAL file roll (file size exceeds threshold). Uses a
small WAL file size
+ * threshold to trigger the roll quickly.
+ */
+ @Test(timeout = 60000)
+ public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws
Exception {
+ // use small WAL file size to trigger auto-roll
+ config.setWalFileSizeThresholdInByte(1024);
+
+ try {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ // write initial data with search index
+ InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1});
+ first.setSearchIndex(1);
+ walNode.log(memTable.getMemTableId(), first, 0, first.getRowCount());
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> walNode.isAllWALEntriesConsumed());
+
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+ AtomicBoolean found = new AtomicBoolean(false);
+ AtomicReference<Exception> error = new AtomicReference<>();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future<?> waitFuture =
+ executor.submit(
+ () -> {
+ try {
+ iterator.waitForNextReady(30, TimeUnit.SECONDS);
+ if (iterator.hasNext()) {
+ found.set(true);
+ }
+ } catch (Exception e) {
+ error.set(e);
+ }
+ });
+
+ Thread.sleep(500);
+
+ // write more data to exceed the small threshold and trigger auto-roll
+ for (int i = 2; i <= 50; i++) {
+ InsertTabletNode node = getInsertTabletNode(devicePath, new long[]
{i});
+ node.setSearchIndex(i);
+ walNode.log(memTable.getMemTableId(), node, 0, node.getRowCount());
+ }
+
+ waitFuture.get(40, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ if (error.get() != null) {
+ fail("waitForNextReady threw unexpected exception: " +
error.get().getMessage());
+ }
+ assertTrue("Iterator should have found data after auto WAL file roll",
found.get());
+ } finally {
+ config.setWalFileSizeThresholdInByte(2 * 1024 * 1024);
+ }
+ }
+
+ /**
+ * Verifies that the no-arg waitForNextReady() automatically triggers a WAL
file roll after the
+ * timeout expires (WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30s). The flow
is: data written to
+ * buffer → waitForRollFile(30s) times out → rollWALFile() called → data
moves to closed file →
+ * hasNext() returns true → method returns.
+ */
+ @Test(timeout = 120000)
+ public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception
{
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + File.separator +
"test.tsfile");
+
+ // write data with search index — stays in the current (active) WAL file
+ InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new
long[] {1});
+ insertTabletNode.setSearchIndex(1);
+ walNode.log(memTable.getMemTableId(), insertTabletNode, 0,
insertTabletNode.getRowCount());
+ walNode.log(
+ memTable.getMemTableId(), new
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
walNode.isAllWALEntriesConsumed());
+
+ // iterator cannot read the active WAL file, so hasNext() should be false
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+ assertFalse("Data should not be visible before WAL file roll",
iterator.hasNext());
+
+ AtomicBoolean found = new AtomicBoolean(false);
+ AtomicReference<Exception> error = new AtomicReference<>();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ long startTime = System.currentTimeMillis();
+
+ // call the no-arg waitForNextReady() — it should:
+ // 1) wait 30s for rollLogWriterCondition (timeout)
+ // 2) auto-call rollWALFile()
+ // 3) data becomes readable, hasNext() returns true, method returns
+ Future<?> waitFuture =
+ executor.submit(
+ () -> {
+ try {
+ iterator.waitForNextReady();
+ if (iterator.hasNext()) {
+ found.set(true);
+ }
+ } catch (Exception e) {
+ error.set(e);
+ }
+ });
+
+ waitFuture.get(90, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ long elapsed = System.currentTimeMillis() - startTime;
+
+ if (error.get() != null) {
+ fail("waitForNextReady() threw unexpected exception: " +
error.get().getMessage());
+ }
+ assertTrue("Iterator should have found data after auto-triggered WAL file
roll", found.get());
+ assertTrue(
+ "Should have waited at least 30s for the timeout to trigger auto-roll,
but only waited "
+ + elapsed
+ + "ms",
+ elapsed >=
TimeUnit.SECONDS.toMillis(WALNode.WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC - 1));
+ }
+
+ private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
+ throws IllegalPathException {
+ String[] measurements = new String[] {"s1", "s2", "s3", "s4", "s5", "s6"};
+ TSDataType[] dataTypes = new TSDataType[6];
+ dataTypes[0] = TSDataType.DOUBLE;
+ dataTypes[1] = TSDataType.FLOAT;
+ dataTypes[2] = TSDataType.INT64;
+ dataTypes[3] = TSDataType.INT32;
+ dataTypes[4] = TSDataType.BOOLEAN;
+ dataTypes[5] = TSDataType.TEXT;
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[times.length];
+ columns[1] = new float[times.length];
+ columns[2] = new long[times.length];
+ columns[3] = new int[times.length];
+ columns[4] = new boolean[times.length];
+ columns[5] = new Binary[times.length];
+
+ for (int r = 0; r < times.length; r++) {
+ ((double[]) columns[0])[r] = 1.0d + r;
+ ((float[]) columns[1])[r] = 2.0f + r;
+ ((long[]) columns[2])[r] = 10000L + r;
+ ((int[]) columns[3])[r] = 100 + r;
+ ((boolean[]) columns[4])[r] = (r % 2 == 0);
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r,
TSFileConfig.STRING_CHARSET);
+ }
+
+ BitMap[] bitMaps = new BitMap[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ bitMaps[i].mark(i % times.length);
+ }
+ MeasurementSchema[] schemas = new MeasurementSchema[6];
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i],
TSEncoding.PLAIN);
+ }
+
+ return new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ measurements,
+ dataTypes,
+ schemas,
+ times,
+ bitMaps,
+ columns,
+ times.length);
+ }
+}