This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_duplicated_overlap_merge in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0df35ae8ac4966215ac2451111003b0628784451 Author: 江天 <[email protected]> AuthorDate: Mon Oct 21 11:29:57 2019 +0800 fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache --- .../universal/CachedPriorityMergeReader.java | 32 +++-- .../reader/universal/PriorityMergeReader.java | 2 +- .../{MergeTest.java => MergeOverLapTest.java} | 146 +++++++++------------ .../apache/iotdb/db/engine/merge/MergeTest.java | 4 +- 4 files changed, 81 insertions(+), 103 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java index e28d6f6..61aeea5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java @@ -34,10 +34,10 @@ public class CachedPriorityMergeReader extends PriorityMergeReader { private TimeValuePair[] timeValuePairCache = new TimeValuePair[CACHE_SIZE]; private int cacheLimit = 0; private int cacheIdx = 0; - private TSDataType dataType; + + private Long lastTimestamp = null; public CachedPriorityMergeReader(TSDataType dataType) { - this.dataType = dataType; for (int i = 0; i < CACHE_SIZE; i++) { timeValuePairCache[i] = TimeValuePairUtils.getEmptyTimeValuePair(dataType); } @@ -53,22 +53,19 @@ public class CachedPriorityMergeReader extends PriorityMergeReader { cacheIdx = 0; while (!heap.isEmpty() && cacheLimit < CACHE_SIZE) { Element top = heap.poll(); - if (cacheLimit == 0 || top.currTime() != timeValuePairCache[cacheLimit - 1].getTimestamp()) { + if (lastTimestamp == null || top.currTime() != lastTimestamp) { TimeValuePairUtils.setTimeValuePair(top.timeValuePair, timeValuePairCache[cacheLimit++]); - if (top.hasNext()) { - top.next(); - heap.add(top); - } else { - top.close(); - } - } else if (top.currTime() == timeValuePairCache[cacheLimit - 1].getTimestamp()) { - if (top.hasNext()) { - top.next(); - heap.add(top); - } else { - top.close(); + lastTimestamp = top.currTime(); + while (heap.peek() != null && heap.peek().currTime() == lastTimestamp) { + heap.poll(); } } + if (top.hasNext()) { + top.next(); + heap.add(top); + } else { + top.close(); + } } } @@ -85,11 +82,12 @@ public class CachedPriorityMergeReader extends PriorityMergeReader { } @Override - public TimeValuePair current() { + public TimeValuePair current() throws IOException { if (0 <= cacheIdx && cacheIdx < cacheLimit) { return timeValuePairCache[cacheIdx]; } else { - return heap.peek().timeValuePair; + fetch(); + return timeValuePairCache[cacheIdx]; } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java index c3d8096..1e1e5d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java @@ -76,7 +76,7 @@ public class PriorityMergeReader implements IPointReader { } @Override - public TimeValuePair current() { + public TimeValuePair current() throws IOException { return heap.peek().timeValuePair; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java similarity index 51% copy from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java copy to server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java index 4d7978b..fd07863 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java @@ -15,105 +15,56 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ package org.apache.iotdb.db.engine.merge; -import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; +import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; -import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; -import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.task.MergeTask; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.MetadataErrorException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Before; +import org.junit.Test; -abstract class MergeTest { - - static final String MERGE_TEST_SG = "root.mergeTest"; - - int seqFileNum = 5; - int unseqFileNum = 5; - int measurementNum = 10; - int deviceNum = 10; - long ptNum = 100; - long flushInterval = 20; - TSEncoding encoding = TSEncoding.PLAIN; - - String[] deviceIds; - MeasurementSchema[] measurementSchemas; - - List<TsFileResource> seqResources = new ArrayList<>(); - List<TsFileResource> unseqResources = new ArrayList<>(); +public class MergeOverLapTest extends MergeTest { - private int prevMergeChunkThreshold; + private File tempSGDir; @Before public void setUp() throws IOException, WriteProcessException, MetadataErrorException { - MManager.getInstance().init(); - prevMergeChunkThreshold = - IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold(); - IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1); - prepareSeries(); - prepareFiles(seqFileNum, unseqFileNum); - MergeManager.getINSTANCE().start(); + ptNum = 1000; + super.setUp(); + tempSGDir = new File("tempSG"); + tempSGDir.mkdirs(); } @After public void tearDown() throws IOException, StorageEngineException { - removeFiles(); - seqResources.clear(); - unseqResources.clear(); - IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold); - TsFileMetaDataCache.getInstance().clear(); - DeviceMetaDataCache.getInstance().clear(); - MManager.getInstance().clear(); - EnvironmentUtils.cleanAllDir(); - MergeManager.getINSTANCE().stop(); - } - - private void prepareSeries() throws MetadataErrorException { - measurementSchemas = new MeasurementSchema[measurementNum]; - for (int i = 0; i < measurementNum; i++) { - measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE, - encoding, CompressionType.UNCOMPRESSED); - } - deviceIds = new String[deviceNum]; - for (int i = 0; i < deviceNum; i++) { - deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i; - } - MManager.getInstance().setStorageGroupToMTree(MERGE_TEST_SG); - for (String device : deviceIds) { - for (MeasurementSchema measurementSchema : measurementSchemas) { - MManager.getInstance().addPathToMTree( - device + PATH_SEPARATOR + measurementSchema.getMeasurementId(), measurementSchema - .getType(), measurementSchema.getEncodingType(), measurementSchema.getCompressor(), - Collections.emptyMap()); - } - } + super.tearDown(); + FileUtils.deleteDirectory(tempSGDir); } - private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { + @Override + void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { for (int i = 0; i < seqFileNum; i++) { File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile"); TsFileResource tsFileResource = new TsFileResource(file); @@ -132,18 +83,7 @@ abstract class MergeTest { prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000); } - private void removeFiles() throws IOException { - for (TsFileResource tsFileResource : seqResources) { - tsFileResource.remove(); - } - for (TsFileResource tsFileResource : unseqResources) { - tsFileResource.remove(); - } - FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); - FileReaderManager.getInstance().stop(); - } - - private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, + void prepareUnseqFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) throws IOException, WriteProcessException { TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile()); @@ -161,10 +101,50 @@ abstract class MergeTest { tsFileResource.updateStartTime(deviceIds[j], i); tsFileResource.updateEndTime(deviceIds[j], i); } + // insert overlapping tuples + if ((i + 1) % 100 == 0) { + for (int j = 0; j < deviceNum; j++) { + TSRecord record = new TSRecord(i, deviceIds[j]); + for (int k = 0; k < measurementNum; k++) { + record.addTuple(DataPoint.getDataPoint(measurementSchemas[k].getType(), + measurementSchemas[k].getMeasurementId(), String.valueOf(i + valueOffset))); + } + fileWriter.write(record); + tsFileResource.updateStartTime(deviceIds[j], i); + tsFileResource.updateEndTime(deviceIds[j], i); + } + } if ((i + 1) % flushInterval == 0) { fileWriter.flushForTest(); } } fileWriter.close(); } -} \ No newline at end of file + + @Test + public void testFullMerge() throws Exception { + MergeTask mergeTask = + new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test", + true, 1, MERGE_TEST_SG); + mergeTask.call(); + + QueryContext context = new QueryContext(); + Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()); + SeqResourceIterateReader tsFilesReader = new SeqResourceIterateReader(path, + Collections.singletonList(seqResources.get(0)), + null, context); + int cnt = 0; + try { + while (tsFilesReader.hasNext()) { + BatchData batchData = tsFilesReader.nextBatch(); + for (int i = 0; i < batchData.length(); i++) { + cnt ++; + assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001); + } + } + assertEquals(1000, cnt); + } finally { + tsFilesReader.close(); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java index 4d7978b..bc3a7b6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java @@ -113,7 +113,7 @@ abstract class MergeTest { } } - private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { + void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { for (int i = 0; i < seqFileNum; i++) { File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile"); TsFileResource tsFileResource = new TsFileResource(file); @@ -143,7 +143,7 @@ abstract class MergeTest { FileReaderManager.getInstance().stop(); } - private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, + void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) throws IOException, WriteProcessException { TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
