This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 22325e17 Add FlushChunkMetadataListener (#328)
22325e17 is described below

commit 22325e17daba8769364f7a868bc30a56c5c3b108
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Dec 10 16:38:30 2024 +0800

    Add FlushChunkMetadataListener (#328)
    
    * Add FlushChunkMetadataListener
    
    * Fix and add test
---
 .../write/writer/FlushChunkMetadataListener.java   | 33 +++++++++++++++++
 .../apache/tsfile/write/writer/TsFileIOWriter.java | 43 ++++++++++++++++++++--
 .../writer/TsFileIOWriterMemoryControlTest.java    | 43 ++++++++++++++++++++++
 3 files changed, 115 insertions(+), 4 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
new file mode 100644
index 00000000..237cb191
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.List;
+
+@FunctionalInterface
+public interface FlushChunkMetadataListener {
+
+  // Pair<device id, measurement id> -> chunk metadata list
+  void onFlush(List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>> 
sortedChunkMetadataList);
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 672fbb8e..54244ed0 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -131,6 +131,8 @@ public class TsFileIOWriter implements AutoCloseable {
 
   protected String encryptKey;
 
+  private final List<FlushChunkMetadataListener> flushListeners = new 
ArrayList<>();
+
   /** empty construct function. */
   protected TsFileIOWriter() {
     if (TS_FILE_CONFIG.getEncryptFlag()) {
@@ -210,6 +212,10 @@ public class TsFileIOWriter implements AutoCloseable {
     this.encryptKey = encryptKey;
   }
 
+  public void addFlushListener(FlushChunkMetadataListener listener) {
+    flushListeners.add(listener);
+  }
+
   /**
    * Writes given bytes to output stream. This method is called when total 
memory size exceeds the
    * chunk group size threshold.
@@ -665,6 +671,19 @@ public class TsFileIOWriter implements AutoCloseable {
     this.maxPlanIndex = maxPlanIndex;
   }
 
+  public long getMaxMetadataSize() {
+    return maxMetadataSize;
+  }
+
+  /**
+   * Set the max memory size of chunk metadata. Note that the new size may be 
larger than current
+   * chunk metadata size, so caller would better call {@link 
#checkMetadataSizeAndMayFlush()} after
+   * this to avoid violating memory control.
+   */
+  public void setMaxMetadataSize(long maxMetadataSize) {
+    this.maxMetadataSize = maxMetadataSize;
+  }
+
   /**
    * Check if the size of chunk metadata in memory is greater than the given 
threshold. If so, the
    * chunk metadata will be written to a temp files. <b>Notice! If you are 
writing a aligned device
@@ -704,25 +723,41 @@ public class TsFileIOWriter implements AutoCloseable {
   protected int sortAndFlushChunkMetadata() throws IOException {
     int writtenSize = 0;
     // group by series
-    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+    final List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
         TSMIterator.sortChunkMetadata(
             chunkGroupMetadataList, currentChunkGroupDeviceId, 
chunkMetadataList);
     if (tempOutput == null) {
       tempOutput = new LocalTsFileOutput(new 
FileOutputStream(chunkMetadataTempFile));
     }
     hasChunkMetadataInDisk = true;
+
+    // This list is the same as sortedChunkMetadataList, but Path is replaced 
by Pair<IDeviceID,
+    // String>
+    final List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
+        sortedChunkMetadataListForCallBack = new ArrayList<>();
+
     for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
-      Path seriesPath = pair.left;
-      boolean isNewPath = !seriesPath.equals(lastSerializePath);
+      final Path seriesPath = pair.left;
+      final boolean isNewPath = !seriesPath.equals(lastSerializePath);
       if (isNewPath) {
         // record the count of path to construct bloom filter later
         pathCount++;
       }
-      List<IChunkMetadata> iChunkMetadataList = pair.right;
+      final List<IChunkMetadata> iChunkMetadataList = pair.right;
       writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, 
seriesPath, isNewPath);
       lastSerializePath = seriesPath;
+      sortedChunkMetadataListForCallBack.add(
+          new Pair<>(
+              new Pair<>(seriesPath.getIDeviceID(), 
seriesPath.getMeasurement()),
+              iChunkMetadataList));
       logger.debug("Flushing {}", seriesPath);
     }
+
+    // notify the listeners
+    for (final FlushChunkMetadataListener listener : flushListeners) {
+      listener.onFlush(sortedChunkMetadataListForCallBack);
+    }
+
     // clear the cache metadata to release the memory
     chunkGroupMetadataList.clear();
     if (chunkMetadataList != null) {
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
index bb47294e..77ffdab0 100644
--- 
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TsFileIOWriterMemoryControlTest {
   private static File testFile = new File("target", "1-1-0-0.tsfile");
@@ -248,6 +249,48 @@ public class TsFileIOWriterMemoryControlTest {
     }
   }
 
+  /** The following test is for calling listeners after flushing chunk 
metadata. */
+  @Test
+  public void testFlushChunkMetadataListener() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 
10)) {
+      final AtomicInteger cnt1 = new AtomicInteger(0);
+      final AtomicInteger cnt2 = new AtomicInteger(0);
+      writer.addFlushListener(sortedChunkMetadataList -> 
cnt1.incrementAndGet());
+      writer.addFlushListener(sortedChunkMetadataList -> 
cnt2.incrementAndGet());
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        IDeviceID deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer);
+        generateBooleanData(1, 0L, new 
ArrayList<>()).writeToFileWriter(writer);
+        generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer);
+        generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer);
+        generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer);
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile,
+              writer.chunkGroupMetadataList,
+              writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = 
iterator.next();
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(sortedSeriesId.get(i % 5), 
timeseriesMetadata.getMeasurementId());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), 
timeseriesMetadata.getTsDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), 
timeseriesMetadata.getStatistics());
+      }
+      Assert.assertEquals(1, cnt1.get());
+      Assert.assertEquals(1, cnt2.get());
+    }
+  }
+
   /** The following tests is for writing normal series in different nums. */
 
   /**

Reply via email to