This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix_add_future_4_flush_opeartion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to 
refs/heads/fix_add_future_4_flush_opeartion by this push:
     new 45b4ccb  replace FlushStatus by Future
45b4ccb is described below

commit 45b4ccb4bdbe4892af3a406c022291d42fa20add
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Feb 12 21:56:43 2019 +0800

    replace FlushStatus by Future
---
 .../engine/bufferwrite/BufferWriteProcessor.java   | 128 ++++++++---------
 .../db/engine/overflow/io/OverflowProcessor.java   | 157 +++++++++++----------
 .../apache/iotdb/db/engine/utils/FlushStatus.java  |  45 ------
 .../apache/iotdb/db/qp/constant/DatetimeUtils.java |   6 +
 .../bufferwrite/BufferWriteProcessorNewTest.java   |  14 +-
 .../bufferwrite/BufferWriteProcessorTest.java      |  16 ++-
 .../engine/overflow/io/OverflowProcessorTest.java  |   1 -
 7 files changed, 176 insertions(+), 191 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2577ebf..f8d7153 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.bufferwrite;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Instant;
-import java.time.ZonedDateTime;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -40,8 +38,8 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -61,9 +59,7 @@ public class BufferWriteProcessor extends Processor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BufferWriteProcessor.class);
   private RestorableTsFileIOWriter writer;
   private FileSchema fileSchema;
-  private volatile FlushStatus flushStatus = new FlushStatus();
-  private volatile Future<Boolean> flushFuture;
-  private volatile boolean isFlush;
+  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
   private ReentrantLock flushQueryLock = new ReentrantLock();
   private AtomicLong memSize = new AtomicLong();
   private long memThreshold = 
TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
@@ -224,7 +220,7 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-      if (isFlush) {
+      if (flushMemTable != null) {
         memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, 
measurementId, dataType));
       }
       memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, 
measurementId, dataType));
@@ -244,7 +240,6 @@ public class BufferWriteProcessor extends Processor {
         workMemTable = new PrimitiveMemTable();
       }
     } finally {
-      isFlush = true;
       flushQueryLock.unlock();
     }
   }
@@ -256,11 +251,15 @@ public class BufferWriteProcessor extends Processor {
       flushMemTable = null;
       writer.appendMetadata();
     } finally {
-      isFlush = false;
       flushQueryLock.unlock();
     }
   }
 
+  /**
+   * the caller mast guarantee no other concurrent caller entering this 
function.
+    * @param displayMessage message that will appear in system log.
+   * @return true if successfully.
+   */
   private boolean flushTask(String displayMessage) {
     boolean result;
     long flushStartTime = System.currentTimeMillis();
@@ -285,66 +284,49 @@ public class BufferWriteProcessor extends Processor {
           getProcessorName(), displayMessage, e);
       result = false;
     } finally {
-      synchronized (flushStatus) {
-        flushStatus.setUnFlushing();
-        switchFlushToWork();
-        flushStatus.notifyAll();
-        LOGGER.info("The bufferwrite processor {} ends flushing {}.", 
getProcessorName(),
+      switchFlushToWork();
+      LOGGER.info("The bufferwrite processor {} ends flushing {}.", 
getProcessorName(),
             displayMessage);
-      }
     }
-    long flushEndTime = System.currentTimeMillis();
-    long flushInterval = flushEndTime - flushStartTime;
-    ZonedDateTime startDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
-        IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    LOGGER.info(
-        "The bufferwrite processor {} flush {}, start time is {}, flush end 
time is {}, "
-            + "flush time consumption is {}ms",
-        getProcessorName(), displayMessage, startDateTime, endDateTime, 
flushInterval);
+    if (LOGGER.isInfoEnabled()) {
+      long flushEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The bufferwrite processor {} flush {}, start time is {}, flush end 
time is {}, "
+              + "flush time consumption is {}ms",
+          getProcessorName(), displayMessage,
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+          flushEndTime - flushStartTime);
+    }
     return result;
   }
 
+  // keyword synchronized is added in this method, so that only one flush task 
can be submitted now.
   @Override
-  public Future<Boolean> flush() throws IOException {
+  public synchronized Future<Boolean> flush() throws IOException {
     // statistic information for flush
     if (lastFlushTime > 0) {
-      long thisFlushTime = System.currentTimeMillis();
-      long flushTimeInterval = thisFlushTime - lastFlushTime;
-      ZonedDateTime lastDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      ZonedDateTime thisDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFlushTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      LOGGER.info(
-          "The bufferwrite processor {}: last flush time is {}, this flush 
time is {}, "
-              + "flush time interval is {}s",
-          getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval / 
1000);
+      if (LOGGER.isInfoEnabled()) {
+        long thisFlushTime = System.currentTimeMillis();
+        LOGGER.info(
+            "The bufferwrite processor {}: last flush time is {}, this flush 
time is {}, "
+                + "flush time interval is {}s", getProcessorName(),
+            DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime),
+            DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
+            (thisFlushTime - lastFlushTime) / 1000);
+      }
     }
     lastFlushTime = System.currentTimeMillis();
     // check value count
     if (valueCount > 0) {
       // waiting for the end of last flush operation.
-//      synchronized (flushStatus) {
-//        while (flushStatus.isFlushing()) {
-//          try {
-//            flushStatus.wait();
-//          } catch (InterruptedException e) {
-//            LOGGER.error(
-//                "Encounter an interrupt error when waitting for the 
flushing, "
-//                    + "the bufferwrite processor is {}.",
-//                getProcessorName(), e);
-//            Thread.currentThread().interrupt();
-//          }
-//        }
-//      }
       try {
         flushFuture.get();
       } catch (InterruptedException | ExecutionException e) {
-        e.printStackTrace();
         LOGGER.error("Encounter an interrupt error when waitting for the 
flushing, "
                 + "the bufferwrite processor is {}.",
             getProcessorName(), e);
+        Thread.currentThread().interrupt();
       }
       // update the lastUpdatetime, prepare for flush
       try {
@@ -357,15 +339,15 @@ public class BufferWriteProcessor extends Processor {
         logNode.notifyStartFlush();
       }
       valueCount = 0;
-      flushStatus.setFlushing();
       switchWorkToFlush();
       BasicMemController.getInstance().reportFree(this, memSize.get());
       memSize.set(0);
       // switch
-      return FlushManager.getInstance().submit(() -> 
flushTask("asynchronously"));
+      flushFuture = FlushManager.getInstance().submit(() -> 
flushTask("asynchronously"));
     } else{
-      return new ImmediateFuture<>(true);
+      flushFuture = new ImmediateFuture<>(true);
     }
+    return flushFuture;
   }
 
   @Override
@@ -386,16 +368,16 @@ public class BufferWriteProcessor extends Processor {
       // flush the changed information for filenode
       filenodeFlushAction.act();
       // delete the restore for this bufferwrite processor
-      long closeEndTime = System.currentTimeMillis();
-      long closeInterval = closeEndTime - closeStartTime;
-      ZonedDateTime startDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      ZonedDateTime endDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeEndTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
-      LOGGER.info(
-          "Close bufferwrite processor {}, the file name is {}, start time is 
{}, end time is {}, "
-              + "time consumption is {}ms",
-          getProcessorName(), fileName, startDateTime, endDateTime, 
closeInterval);
+      if (LOGGER.isInfoEnabled()) {
+        long closeEndTime = System.currentTimeMillis();
+        LOGGER.info(
+            "Close bufferwrite processor {}, the file name is {}, start time 
is {}, end time is {}, "
+                + "time consumption is {}ms",
+            getProcessorName(), fileName,
+            DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+            DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+            closeEndTime - closeStartTime);
+      }
     } catch (IOException e) {
       LOGGER.error("Close the bufferwrite processor error, the bufferwrite is 
{}.",
           getProcessorName(), e);
@@ -418,9 +400,7 @@ public class BufferWriteProcessor extends Processor {
    * @return True if flushing
    */
   public boolean isFlush() {
-    synchronized (flushStatus) {
-      return flushStatus.isFlushing();
-    }
+    return !flushFuture.isDone();
   }
 
   /**
@@ -503,4 +483,20 @@ public class BufferWriteProcessor extends Processor {
   public WriteLogNode getLogNode() {
     return logNode;
   }
+
+  /**
+   * used for test. We can know when the flush() is called.
+   * @return the last flush() time.
+   */
+  public long getLastFlushTime() {
+    return lastFlushTime;
+  }
+
+  /**
+   * used for test. We can block to wait for finishing flushing.
+   * @return the future of the flush() task.
+   */
+  public Future<Boolean> getFlushFuture() {
+    return flushFuture;
+  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 07c5161..3e53acd 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -46,8 +46,8 @@ import 
org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -68,14 +68,13 @@ public class OverflowProcessor extends Processor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OverflowProcessor.class);
   private static final IoTDBConfig TsFileDBConf = 
IoTDBDescriptor.getInstance().getConfig();
-  private static final TSFileConfig TsFileConf = 
TSFileDescriptor.getInstance().getConfig();
   private OverflowResource workResource;
   private OverflowResource mergeResource;
 
   private OverflowSupport workSupport;
   private OverflowSupport flushSupport;
 
-  private volatile FlushStatus flushStatus = new FlushStatus();
+  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
   private volatile boolean isMerge;
   private int valueCount;
   private String parentPath;
@@ -298,15 +297,20 @@ public class OverflowProcessor extends Processor {
                                                           TSDataType dataType) 
{
 
     MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-    if (flushStatus.isFlushing()) {
+    queryFlushLock.lock();
+    try {
+      if (flushSupport != null) {
+        memSeriesLazyMerger
+            .addMemSeries(
+                flushSupport.queryOverflowInsertInMemory(deviceId, 
measurementId, dataType));
+      }
       memSeriesLazyMerger
-              .addMemSeries(
-                      flushSupport.queryOverflowInsertInMemory(deviceId, 
measurementId, dataType));
+          .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, 
measurementId,
+              dataType));
+      return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
+    } finally {
+      queryFlushLock.unlock();
     }
-    memSeriesLazyMerger
-            .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, 
measurementId,
-                    dataType));
-    return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
   }
 
   /**
@@ -372,10 +376,9 @@ public class OverflowProcessor extends Processor {
     if (!isMerge) {
       return new Pair<>(null, null);
     }
-    Pair<String, List<ChunkMetaData>> pair = new Pair<>(
+    return  new Pair<>(
             mergeResource.getInsertFilePath(),
             mergeResource.getInsertMetadatas(deviceId, measurementId, 
dataType));
-    return pair;
   }
 
   private void switchWorkToFlush() {
@@ -425,12 +428,10 @@ public class OverflowProcessor extends Processor {
   }
 
   public boolean isFlush() {
-    synchronized (flushStatus) {
-      return flushStatus.isFlushing();
-    }
+    return !flushFuture.isDone();
   }
 
-  private boolean flushOperation(String displayMessage) {
+  private boolean flushTask(String displayMessage) {
     boolean result;
     long flushStartTime = System.currentTimeMillis();
     try {
@@ -450,29 +451,27 @@ public class OverflowProcessor extends Processor {
               getProcessorName(), displayMessage, 
Thread.currentThread().getName(), e);
       result = false;
     } finally {
-      synchronized (flushStatus) {
-        flushStatus.setUnFlushing();
         // switch from flush to work.
         switchFlushToWork();
-        flushStatus.notifyAll();
-      }
     }
     // log flush time
-    LOGGER.info("The overflow processor {} ends flushing {}.", 
getProcessorName(), displayMessage);
-    long flushEndTime = System.currentTimeMillis();
-    long timeInterval = flushEndTime - flushStartTime;
-    ZonedDateTime startDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    LOGGER.info(
-            "The overflow processor {} flush {}, start time is {}, flush end 
time is {}," +
-                    " time consumption is {}ms",
-            getProcessorName(), displayMessage, startDateTime, endDateTime, 
timeInterval);
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER
+          .info("The overflow processor {} ends flushing {}.", 
getProcessorName(), displayMessage);
+      long flushEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The overflow processor {} flush {}, start time is {}, flush end 
time is {}," +
+              " time consumption is {}ms",
+          getProcessorName(), displayMessage,
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+          flushEndTime - flushStartTime);
+    }
     return result;
   }
 
-  private Future<Boolean> flush(boolean synchronization) throws 
OverflowProcessorException {
+  @Override
+  public synchronized Future<Boolean> flush() throws IOException {
     // statistic information for flush
     if (lastFlushTime > 0) {
       long thisFLushTime = System.currentTimeMillis();
@@ -489,21 +488,20 @@ public class OverflowProcessor extends Processor {
     lastFlushTime = System.currentTimeMillis();
     // value count
     if (valueCount > 0) {
-      synchronized (flushStatus) {
-        while (flushStatus.isFlushing()) {
-          try {
-            flushStatus.wait();
-          } catch (InterruptedException e) {
-            LOGGER.error("Waiting the flushstate error in flush row group to 
store.", e);
-          }
-        }
+      try {
+        flushFuture.get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error("Encounter an interrupt error when waitting for the 
flushing, "
+                + "the bufferwrite processor is {}.",
+            getProcessorName(), e);
+        Thread.currentThread().interrupt();
       }
       try {
         // backup newIntervalFile list and emptyIntervalFileNode
         overflowFlushAction.act();
       } catch (Exception e) {
         LOGGER.error("Flush the overflow rowGroup to file faied, when 
overflowFlushAction act");
-        throw new OverflowProcessorException(e);
+        throw new IOException(e);
       }
 
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
@@ -518,47 +516,42 @@ public class OverflowProcessor extends Processor {
       memSize.set(0);
       valueCount = 0;
       // switch from work to flush
-      flushStatus.setFlushing();
       switchWorkToFlush();
-      if (synchronization) {
-        return new ImmediateFuture<>(flushOperation("synchronously"));
-      } else {
-        return FlushManager.getInstance().submit( () ->
-            flushOperation("asynchronously"));
-      }
+      flushFuture = FlushManager.getInstance().submit( () ->
+          flushTask("asynchronously"));
     } else {
-      return new ImmediateFuture(true);
+      flushFuture = new ImmediateFuture(true);
     }
+    return flushFuture;
 
   }
 
   @Override
-  public Future<Boolean> flush() throws IOException {
-    try {
-      return flush(false);
-    } catch (OverflowProcessorException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
   public void close() throws OverflowProcessorException {
     LOGGER.info("The overflow processor {} starts close operation.", 
getProcessorName());
     long closeStartTime = System.currentTimeMillis();
     // flush data
-    flush(true);
-    LOGGER.info("The overflow processor {} ends close operation.", 
getProcessorName());
-    // log close time
-    long closeEndTime = System.currentTimeMillis();
-    long timeInterval = closeEndTime - closeStartTime;
-    ZonedDateTime startDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    LOGGER.info(
-            "The close operation of overflow processor {} starts at {} and 
ends at {}."
-                    + " It comsumes {}ms.",
-            getProcessorName(), startDateTime, endDateTime, timeInterval);
+    try {
+      flush().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Encounter an interrupt error when waitting for the 
flushing, "
+              + "the bufferwrite processor is {}.",
+          getProcessorName(), e);
+      Thread.currentThread().interrupt();
+    } catch (IOException e) {
+      throw new OverflowProcessorException(e);
+    }
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info("The overflow processor {} ends close operation.", 
getProcessorName());
+      // log close time
+      long closeEndTime = System.currentTimeMillis();
+      LOGGER.info(
+          "The close operation of overflow processor {} starts at {} and ends 
at {}."
+              + " It comsumes {}ms.",
+          getProcessorName(), 
DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+          DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+          closeEndTime - closeStartTime);
+    }
   }
 
   public void clear() throws IOException {
@@ -659,7 +652,7 @@ public class OverflowProcessor extends Processor {
             Objects.equals(mergeResource, that.mergeResource) &&
             Objects.equals(workSupport, that.workSupport) &&
             Objects.equals(flushSupport, that.flushSupport) &&
-            Objects.equals(flushStatus, that.flushStatus) &&
+            Objects.equals(flushFuture, that.flushFuture) &&
             Objects.equals(parentPath, that.parentPath) &&
             Objects.equals(dataPahtCount, that.dataPahtCount) &&
             Objects.equals(queryFlushLock, that.queryFlushLock) &&
@@ -673,8 +666,24 @@ public class OverflowProcessor extends Processor {
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), workResource, mergeResource, 
workSupport,
-            flushSupport, flushStatus, isMerge, valueCount, parentPath, 
lastFlushTime,
+            flushSupport, flushFuture, isMerge, valueCount, parentPath, 
lastFlushTime,
             dataPahtCount, queryFlushLock, overflowFlushAction, 
filenodeFlushAction, fileSchema,
             memThreshold, memSize, logNode);
   }
+
+  /**
+   * used for test. We can block to wait for finishing flushing.
+   * @return the future of the flush() task.
+   */
+  public Future<Boolean> getFlushFuture() {
+    return flushFuture;
+  }
+
+  /**
+   * used for test. We can know when the flush() is called.
+   * @return the last flush() time.
+   */
+  public long getLastFlushTime() {
+    return lastFlushTime;
+  }
 }
\ No newline at end of file
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
deleted file mode 100644
index f437c96..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.utils;
-
-/**
- * This class is used to represent the state of flush. It's can be used in the 
bufferwrite
- * flush{@code SequenceFileManager} and overflow flush{@code 
OverFlowProcessor}.
- */
-public class FlushStatus {
-
-  private boolean isFlushing;
-
-  public FlushStatus() {
-    this.isFlushing = false;
-  }
-
-  public boolean isFlushing() {
-    return isFlushing;
-  }
-
-  public void setFlushing() {
-    this.isFlushing = true;
-  }
-
-  public void setUnFlushing() {
-    this.isFlushing = false;
-  }
-
-}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
index 5011071..266c439 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
@@ -27,6 +27,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.format.SignStyle;
 import java.time.temporal.ChronoField;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 
 public class DatetimeUtils {
@@ -206,4 +207,9 @@ public class DatetimeUtils {
   public static ZoneOffset toZoneOffset(ZoneId zoneId) {
     return zoneId.getRules().getOffset(Instant.now());
   }
+
+  public static ZonedDateTime convertMillsecondToZonedDateTime(long 
millisecond) {
+    return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millisecond),
+        IoTDBDescriptor.getInstance().getConfig().getZoneID());
+  }
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 0efe087..04046b8 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -26,7 +26,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -39,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -123,13 +126,18 @@ public class BufferWriteProcessorNewTest {
       assertEquals(num, timeValuePair.getValue().getInt());
     }
     assertEquals(false, bufferwrite.isFlush());
+    long lastFlushTime = bufferwrite.getLastFlushTime();
     // flush asynchronously
     bufferwrite.flush();
-    assertEquals(true, bufferwrite.isFlush());
+    assertEquals(true, bufferwrite.getLastFlushTime() != lastFlushTime);
     assertEquals(true, bufferwrite.canBeClosed());
     // waiting for the end of flush.
-    while (bufferwrite.isFlush()) {
-      TimeUnit.SECONDS.sleep(1);
+    try {
+      bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      //because UT uses a mock flush operation, 10 seconds should be enough.
+      Assert.fail("mock flush spends more than 10 seconds... "
+          + "Please modify the value or change a better test environment");
     }
     pair = bufferwrite.queryBufferWriteData(processorName, measurementId, 
dataType);
     left = pair.left;
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 95e0d55..6a2e474 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.bufferwrite;
 
 import static org.junit.Assert.assertEquals;
 
+import ch.qos.logback.core.util.TimeUtil;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -27,7 +28,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.PathUtils;
@@ -194,14 +197,23 @@ public class BufferWriteProcessorTest {
     assertEquals(0, bufferwrite.memoryUsage());
     assertEquals(TsFileIOWriter.magicStringBytes.length, 
bufferwrite.getFileSize());
     assertEquals(0, bufferwrite.getMetaSize());
+    long lastFlushTime = bufferwrite.getLastFlushTime();
     for (int i = 1; i <= 85; i++) {
       bufferwrite.write(deviceId, measurementId, i, dataType, 
String.valueOf(i));
       assertEquals(i * 12, bufferwrite.memoryUsage());
     }
+    assertEquals(lastFlushTime, bufferwrite.getLastFlushTime());
     bufferwrite.write(deviceId, measurementId, 86, dataType, 
String.valueOf(86));
-    assertEquals(true, bufferwrite.isFlush());
+    //assert a flush() is called.
+    assertEquals(false, bufferwrite.getLastFlushTime()==lastFlushTime);
     // sleep to the end of flush
-    TimeUnit.SECONDS.sleep(2);
+    try {
+      bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      //because UT uses a mock flush operation, 10 seconds should be enough.
+      Assert.fail("mock flush spends more than 10 seconds... "
+          + "Please modify the value or change a better test environment");
+    }
     assertEquals(false, bufferwrite.isFlush());
     assertEquals(0, bufferwrite.memoryUsage());
     // query result
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 43469e2..bb2bddb 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -89,7 +89,6 @@ public class OverflowProcessorTest {
     assertEquals(0,
         
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
     processor.flush();
-    assertEquals(false, processor.isFlush());
     assertEquals(false, processor.isMerge());
     // write insert data
     OverflowTestUtils.produceInsertData(processor);

Reply via email to