This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 22325e17 Add FlushChunkMetadataListener (#328)
22325e17 is described below
commit 22325e17daba8769364f7a868bc30a56c5c3b108
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Dec 10 16:38:30 2024 +0800
Add FlushChunkMetadataListener (#328)
* Add FlushChunkMetadataListener
* Fix and add test
---
.../write/writer/FlushChunkMetadataListener.java | 33 +++++++++++++++++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 43 ++++++++++++++++++++--
.../writer/TsFileIOWriterMemoryControlTest.java | 43 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 4 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
new file mode 100644
index 00000000..237cb191
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.List;
+
+@FunctionalInterface
+public interface FlushChunkMetadataListener {
+
+ // Pair<device id, measurement id> -> chunk metadata list
+ void onFlush(List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
sortedChunkMetadataList);
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 672fbb8e..54244ed0 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -131,6 +131,8 @@ public class TsFileIOWriter implements AutoCloseable {
protected String encryptKey;
+ private final List<FlushChunkMetadataListener> flushListeners = new
ArrayList<>();
+
/** empty construct function. */
protected TsFileIOWriter() {
if (TS_FILE_CONFIG.getEncryptFlag()) {
@@ -210,6 +212,10 @@ public class TsFileIOWriter implements AutoCloseable {
this.encryptKey = encryptKey;
}
+ public void addFlushListener(FlushChunkMetadataListener listener) {
+ flushListeners.add(listener);
+ }
+
/**
* Writes given bytes to output stream. This method is called when total
memory size exceeds the
* chunk group size threshold.
@@ -665,6 +671,19 @@ public class TsFileIOWriter implements AutoCloseable {
this.maxPlanIndex = maxPlanIndex;
}
+ public long getMaxMetadataSize() {
+ return maxMetadataSize;
+ }
+
+ /**
+ * Set the max memory size of chunk metadata. Note that the new size may be
larger than current
+ * chunk metadata size, so caller would better call {@link
#checkMetadataSizeAndMayFlush()} after
+ * this to avoid violating memory control.
+ */
+ public void setMaxMetadataSize(long maxMetadataSize) {
+ this.maxMetadataSize = maxMetadataSize;
+ }
+
/**
* Check if the size of chunk metadata in memory is greater than the given
threshold. If so, the
* chunk metadata will be written to a temp files. <b>Notice! If you are
writing a aligned device
@@ -704,25 +723,41 @@ public class TsFileIOWriter implements AutoCloseable {
protected int sortAndFlushChunkMetadata() throws IOException {
int writtenSize = 0;
// group by series
- List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+ final List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
chunkGroupMetadataList, currentChunkGroupDeviceId,
chunkMetadataList);
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new
FileOutputStream(chunkMetadataTempFile));
}
hasChunkMetadataInDisk = true;
+
+ // This list is the same as sortedChunkMetadataList, but Path is replaced
by Pair<IDeviceID,
+ // String>
+ final List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
+ sortedChunkMetadataListForCallBack = new ArrayList<>();
+
for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
- Path seriesPath = pair.left;
- boolean isNewPath = !seriesPath.equals(lastSerializePath);
+ final Path seriesPath = pair.left;
+ final boolean isNewPath = !seriesPath.equals(lastSerializePath);
if (isNewPath) {
// record the count of path to construct bloom filter later
pathCount++;
}
- List<IChunkMetadata> iChunkMetadataList = pair.right;
+ final List<IChunkMetadata> iChunkMetadataList = pair.right;
writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList,
seriesPath, isNewPath);
lastSerializePath = seriesPath;
+ sortedChunkMetadataListForCallBack.add(
+ new Pair<>(
+ new Pair<>(seriesPath.getIDeviceID(),
seriesPath.getMeasurement()),
+ iChunkMetadataList));
logger.debug("Flushing {}", seriesPath);
}
+
+ // notify the listeners
+ for (final FlushChunkMetadataListener listener : flushListeners) {
+ listener.onFlush(sortedChunkMetadataListForCallBack);
+ }
+
// clear the cache metadata to release the memory
chunkGroupMetadataList.clear();
if (chunkMetadataList != null) {
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
index bb47294e..77ffdab0 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
public class TsFileIOWriterMemoryControlTest {
private static File testFile = new File("target", "1-1-0-0.tsfile");
@@ -248,6 +249,48 @@ public class TsFileIOWriterMemoryControlTest {
}
}
+ /** The following test is for calling listeners after flushing chunk
metadata. */
+ @Test
+ public void testFlushChunkMetadataListener() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 *
10)) {
+ final AtomicInteger cnt1 = new AtomicInteger(0);
+ final AtomicInteger cnt2 = new AtomicInteger(0);
+ writer.addFlushListener(sortedChunkMetadataList ->
cnt1.incrementAndGet());
+ writer.addFlushListener(sortedChunkMetadataList ->
cnt2.incrementAndGet());
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ IDeviceID deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateBooleanData(1, 0L, new
ArrayList<>()).writeToFileWriter(writer);
+ generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile,
+ writer.chunkGroupMetadataList,
+ writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<Path, TimeseriesMetadata> timeseriesMetadataPair =
iterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(sortedSeriesId.get(i % 5),
timeseriesMetadata.getMeasurementId());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(),
timeseriesMetadata.getTsDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(),
timeseriesMetadata.getStatistics());
+ }
+ Assert.assertEquals(1, cnt1.get());
+ Assert.assertEquals(1, cnt2.get());
+ }
+ }
+
/** The following tests is for writing normal series in different nums. */
/**