This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_many_bugs in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 230dd5419242ded39431ba88efa554fa76d4eaea Author: qiaojialin <[email protected]> AuthorDate: Tue Mar 31 19:10:38 2020 +0800 fix recover bugs --- .../main/java/org/apache/iotdb/SessionExample.java | 1 - .../engine/storagegroup/StorageGroupProcessor.java | 4 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 53 ++++++++------- .../db/writelog/recover/SeqTsFileRecoverTest.java | 4 ++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 18 ++---- .../write/writer/RestorableTsFileIOWriter.java | 30 +++++---- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 75 +++++++++++++--------- 7 files changed, 102 insertions(+), 83 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 33e3090..70b5411 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.record.RowBatch; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index e006118..97c2be9 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -408,7 +408,7 @@ public class StorageGroupProcessor { getVersionControllerByTimePartitionId(timePartitionId), this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback, true, writer); - workUnsequenceTsFileProcessors + workSequenceTsFileProcessors .put(timePartitionId, tsFileProcessor); tsFileResource.setProcessor(tsFileProcessor); tsFileProcessor.setTimeRangeId(timePartitionId); @@ -437,6 +437,8 @@ public class StorageGroupProcessor { getVersionControllerByTimePartitionId(timePartitionId), this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false, writer); + workUnsequenceTsFileProcessors + .put(timePartitionId, tsFileProcessor); tsFileResource.setProcessor(tsFileProcessor); tsFileProcessor.setTimeRangeId(timePartitionId); writer.makeMetadataVisible(); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index f2ef3ec..4c782af 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -571,9 +571,10 @@ public class PlanExecutor implements IPlanExecutor { file.getAbsolutePath())); } Map<Path, MeasurementSchema> schemaMap = new HashMap<>(); - Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>(); + + List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList = new ArrayList<>(); try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { - reader.selfCheck(schemaMap, chunkMetaDataListMap, false); + reader.selfCheck(schemaMap, chunkGroupMetadataList, false); } FileLoaderUtils.checkTsFileResource(tsFileResource); @@ -586,7 +587,7 @@ public class PlanExecutor implements IPlanExecutor { //create schemas if they doesn't exist if (plan.isAutoCreateSchema()) { - createSchemaAutomatically(chunkMetaDataListMap, schemaMap, plan.getSgLevel()); + createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel()); } StorageEngine.getInstance().loadNewTsFile(tsFileResource); @@ -596,36 +597,34 @@ public class PlanExecutor implements IPlanExecutor { } } - private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> chunkMetaDataListMap, + private void createSchemaAutomatically( + List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList, Map<Path, MeasurementSchema> knownSchemas, int sgLevel) throws QueryProcessException, MetadataException { - if (chunkMetaDataListMap.isEmpty()) { + if (chunkGroupMetadataList.isEmpty()) { return; } - for (Entry<Path, List<ChunkMetadata>> entry : chunkMetaDataListMap.entrySet()) { - String device = entry.getKey().getDevice(); + + Set<Path> registeredSeries = new HashSet<>(); + for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata : chunkGroupMetadataList) { + String device = chunkGroupMetadata.left; MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel); - for (ChunkMetadata chunkMetaData : entry.getValue()) { - String measurement = chunkMetaData.getMeasurementUid(); - String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement; - MeasurementSchema schema = knownSchemas.get(entry.getKey()); - if (schema == null) { - throw new MetadataException(String - .format("Can not get the schema of measurement [%s]", measurement)); - } - if (!node.hasChild(measurement)) { - try { - mManager.createTimeseries(fullPath, schema.getType(), schema.getEncodingType(), - schema.getCompressor(), Collections.emptyMap()); - } catch (MetadataException e) { - if (!e.getMessage().contains("already exist")) { - throw e; - } + for (ChunkMetadata chunkMetadata : chunkGroupMetadata.right) { + Path series = new Path(chunkGroupMetadata.left, chunkMetadata.getMeasurementUid()); + if (!registeredSeries.contains(series)) { + registeredSeries.add(series); + MeasurementSchema schema = knownSchemas.get(series); + if (schema == null) { + throw new MetadataException(String.format("Can not get the schema of measurement [%s]", + chunkMetadata.getMeasurementUid())); + } + if (!node.hasChild(chunkMetadata.getMeasurementUid())) { + mManager.createTimeseries(series.getFullPath(), schema.getType(), + schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap()); + } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) { + throw new QueryProcessException( + String.format("Current Path is not leaf node. %s", series)); } - } - if (node.getChild(measurement) instanceof InternalMNode) { - throw new QueryProcessException( - String.format("Current Path is not leaf node. %s", fullPath)); } } } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index 8480e4a..3c2a38b 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -215,6 +215,10 @@ public class SeqTsFileRecoverTest { versionController, resource, true, true); ActiveTimeSeriesCounter.getInstance().init(storageGroup); RestorableTsFileIOWriter writer = performer.recover(); + + writer.makeMetadataVisible(); + assertEquals(11, writer.getMetadatasForQuery().size()); + assertTrue(writer.canWrite()); writer.endFile(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 93354a6..af37244 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -563,7 +563,7 @@ public class TsFileSequenceReader implements AutoCloseable { * * @param newSchema @OUT. the measurement schema in the file will be added into this parameter. * (can be null) - * @param chunkMetadataListMap @OUT. the treeMap (Path -> ChunkmetadataList) + * @param chunkGroupMetadataList @OUT. the treeMap (Path -> ChunkmetadataList) * (can be null) * @param fastFinish if true and the file is complete, then newSchema and newMetaData parameter * will be not modified. @@ -572,7 +572,7 @@ public class TsFileSequenceReader implements AutoCloseable { */ public long selfCheck(Map<Path, MeasurementSchema> newSchema, - Map<Path, List<ChunkMetadata>> chunkMetadataListMap, + List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList, boolean fastFinish) throws IOException { File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file); long fileSize; @@ -586,7 +586,8 @@ public class TsFileSequenceReader implements AutoCloseable { TSDataType dataType; long fileOffsetOfChunk; - List<ChunkMetadata> chunks = null; + // ChunkMetadata of current ChunkGroup + List<ChunkMetadata> chunkMetadataList = null; String deviceID; int position = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER @@ -622,7 +623,7 @@ public class TsFileSequenceReader implements AutoCloseable { // this is the first chunk of a new ChunkGroup. if (newChunkGroup) { newChunkGroup = false; - chunks = new ArrayList<>(); + chunkMetadataList = new ArrayList<>(); } fileOffsetOfChunk = this.position() - 1; // if there is something wrong with a chunk, we will drop the whole ChunkGroup @@ -654,7 +655,7 @@ public class TsFileSequenceReader implements AutoCloseable { } currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics); - chunks.add(currentChunk); + chunkMetadataList.add(currentChunk); chunkCnt++; break; case MetaMarker.CHUNK_GROUP_FOOTER: @@ -669,12 +670,7 @@ public class TsFileSequenceReader implements AutoCloseable { newSchema.putIfAbsent(new Path(deviceID, tsSchema.getMeasurementId()), tsSchema); } } - if (chunkMetadataListMap != null) { - for (ChunkMetadata chunk : chunks) { - Path path = new Path(deviceID, chunk.getMeasurementUid()); - chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(chunk); - } - } + chunkGroupMetadataList.add(new Pair<>(deviceID, chunkMetadataList)); newChunkGroup = true; truncatedPosition = this.position(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index c1fa885..3abb4e7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -59,7 +59,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { /** * all chunk group metadata which have been serialized on disk. */ - private Map<String, Map<String, List<ChunkMetadata>>> metadatas = new HashMap<>(); + private Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = new HashMap<>(); /** * @param file a given tsfile path you want to (continue to) write @@ -90,7 +90,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { } // uncompleted file - truncatedPosition = reader.selfCheck(knownSchemas, chunkMetadataListMap, true); + truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true); totalChunkNum = reader.getTotalChunkNum(); if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) { out.close(); @@ -168,8 +168,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId, TSDataType dataType) { List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); - if (metadatas.containsKey(deviceId) && metadatas.get(deviceId).containsKey(measurementId)) { - for (ChunkMetadata chunkMetaData : metadatas.get(deviceId).get(measurementId)) { + if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) { + for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) { // filter: if adevice'sensor is defined as float type, and data has been persistent. // Then someone deletes the timeseries and recreate it with Int type. We have to ignore // all the stale data. @@ -181,6 +181,10 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { return chunkMetadataList; } + public Map<String, Map<String, List<ChunkMetadata>>> getMetadatasForQuery() { + return metadatasForQuery; + } + /** * add all appendChunkMetadatas into memory. After calling this method, other classes can * read these metadata. @@ -194,13 +198,13 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { String deviceId = pair.left; for (ChunkMetadata chunkMetaData : rowMetaDataList) { String measurementId = chunkMetaData.getMeasurementUid(); - if (!metadatas.containsKey(deviceId)) { - metadatas.put(deviceId, new HashMap<>()); + if (!metadatasForQuery.containsKey(deviceId)) { + metadatasForQuery.put(deviceId, new HashMap<>()); } - if (!metadatas.get(deviceId).containsKey(measurementId)) { - metadatas.get(deviceId).put(measurementId, new ArrayList<>()); + if (!metadatasForQuery.get(deviceId).containsKey(measurementId)) { + metadatasForQuery.get(deviceId).put(measurementId, new ArrayList<>()); } - metadatas.get(deviceId).get(measurementId).add(chunkMetaData); + metadatasForQuery.get(deviceId).get(measurementId).add(chunkMetaData); } } } @@ -218,10 +222,10 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { */ private List<Pair<String, List<ChunkMetadata>>> getAppendedRowMetadata() { List<Pair<String, List<ChunkMetadata>>> append = new ArrayList<>(); - if (lastFlushedChunkGroupIndex < chunkGroupInfoList.size()) { - append.addAll(chunkGroupInfoList - .subList(lastFlushedChunkGroupIndex, chunkGroupInfoList.size())); - lastFlushedChunkGroupIndex = chunkGroupInfoList.size(); + if (lastFlushedChunkGroupIndex < chunkGroupMetadataList.size()) { + append.addAll(chunkGroupMetadataList + .subList(lastFlushedChunkGroupIndex, chunkGroupMetadataList.size())); + lastFlushedChunkGroupIndex = chunkGroupMetadataList.size(); } return append; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 0339f5b..d19344e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.tsfile.write.writer; +import java.util.Iterator; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.MetaMarker; @@ -65,14 +66,19 @@ public class TsFileIOWriter { } protected TsFileOutput out; - protected List<Pair<String, List<ChunkMetadata>>> chunkGroupInfoList = new ArrayList<>(); protected boolean canWrite = true; protected int totalChunkNum = 0; protected int invalidChunkNum; protected File file; - protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); - protected Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>(); + + + // current flushed Chunk private ChunkMetadata currentChunkMetadata; + // current flushed ChunkGroup + protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); + // all flushed ChunkGroup, device -> List<ChunkMetadata> + protected List<Pair<String, List<ChunkMetadata>>> chunkGroupMetadataList = new ArrayList<>(); + private long markedPosition; private String deviceId; private long currentChunkGroupStartOffset; @@ -144,7 +150,7 @@ public class TsFileIOWriter { ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(deviceId, dataSize, chunkMetadataList.size()); chunkGroupFooter.serializeTo(out.wrapAsStream()); - chunkGroupInfoList.add(new Pair<>(deviceId, chunkMetadataList)); + chunkGroupMetadataList.add(new Pair<>(deviceId, chunkMetadataList)); logger.debug("end chunk group:{}", chunkMetadataList); deviceId = null; chunkMetadataList = null; @@ -204,8 +210,6 @@ public class TsFileIOWriter { */ public void endCurrentChunk() { chunkMetadataList.add(currentChunkMetadata); - Path path = new Path(deviceId, currentChunkMetadata.getMeasurementUid()); - chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(currentChunkMetadata); currentChunkMetadata = null; totalChunkNum++; } @@ -219,10 +223,17 @@ public class TsFileIOWriter { // serialize the SEPARATOR of MetaData ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); - - logger.debug("get time series list:{}", chunkMetadataListMap.keySet()); - - Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(); + + // group ChunkMetadata by series + Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>(); + for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata: chunkGroupMetadataList) { + for (ChunkMetadata chunkMetadata : chunkGroupMetadata.right) { + Path series = new Path(chunkGroupMetadata.left, chunkMetadata.getMeasurementUid()); + chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata); + } + } + + Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap); TsFileMetadata tsFileMetaData = new TsFileMetadata(); tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap); @@ -231,7 +242,9 @@ public class TsFileIOWriter { tsFileMetaData.setInvalidChunkNum(invalidChunkNum); long footerIndex = out.getPosition(); - logger.debug("start to flush the footer,file pos:{}", footerIndex); + if (logger.isDebugEnabled()) { + logger.debug("start to flush the footer,file pos:{}", footerIndex); + } // write TsFileMetaData int size = tsFileMetaData.serializeTo(out.wrapAsStream()); @@ -253,19 +266,18 @@ public class TsFileIOWriter { // close file out.close(); - if (resourceLogger.isInfoEnabled() && file != null) { - resourceLogger.info("{} writer is closed.", file.getName()); + if (resourceLogger.isDebugEnabled() && file != null) { + resourceLogger.debug("{} writer is closed.", file.getName()); } canWrite = false; - chunkMetadataListMap = new TreeMap<>(); - logger.info("output stream is closed"); } /** * Flush ChunkMetadataList and TimeseriesMetaData * @return DeviceMetaDataMap in TsFileMetaData */ - private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList() throws IOException { + private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList( + Map<Path, List<ChunkMetadata>> chunkMetadataListMap) throws IOException { // convert ChunkMetadataList to this field Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new LinkedHashMap<>(); @@ -304,7 +316,7 @@ public class TsFileIOWriter { size += timeseriesMetaData.serializeTo(out.wrapAsStream()); } deviceMetadataMap - .put(device, new Pair<Long, Integer>(offsetOfFirstTimeseriesMetaDataInDevice, size)); + .put(device, new Pair<>(offsetOfFirstTimeseriesMetaDataInDevice, size)); } // return return deviceMetadataMap; @@ -323,11 +335,10 @@ public class TsFileIOWriter { // device -> ChunkMetadataList public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() { Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>(); - for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) { - Path path = entry.getKey(); - String device = path.getDevice(); - deviceChunkMetadataMap.computeIfAbsent(device, k -> new ArrayList<>()) - .addAll(entry.getValue()); + + for (Pair<String, List<ChunkMetadata>> chunkGroupMetadata : chunkGroupMetadataList) { + deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.left, k -> new ArrayList<>()) + .addAll(chunkGroupMetadata.right); } return deviceChunkMetadataMap; } @@ -376,22 +387,26 @@ public class TsFileIOWriter { /** * Remove such ChunkMetadata that its startTime is not in chunkStartTimes */ - public void filterChunks(Map<Path, List<Long>> chunkStartTimes) { Map<Path, Integer> startTimeIdxes = new HashMap<>(); chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0)); - for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) { - List<ChunkMetadata> chunkMetadatas = entry.getValue(); - Path path = entry.getKey(); - int chunkNum = chunkMetadatas.size(); - for (ChunkMetadata chunkMetaData : chunkMetadatas) { + Iterator<Pair<String, List<ChunkMetadata>>> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator(); + while (chunkGroupMetaDataIterator.hasNext()) { + Pair<String, List<ChunkMetadata>> chunkGroupMetaData = chunkGroupMetaDataIterator.next(); + String deviceId = chunkGroupMetaData.left; + int chunkNum = chunkGroupMetaData.right.size(); + Iterator<ChunkMetadata> chunkMetaDataIterator = chunkGroupMetaData.right.iterator(); + while (chunkMetaDataIterator.hasNext()) { + ChunkMetadata chunkMetaData = chunkMetaDataIterator.next(); + Path path = new Path(deviceId, chunkMetaData.getMeasurementUid()); int startTimeIdx = startTimeIdxes.get(path); + List<Long> pathChunkStartTimes = chunkStartTimes.get(path); boolean chunkValid = startTimeIdx < pathChunkStartTimes.size() && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime(); if (!chunkValid) { - chunkMetadatas.remove(chunkMetaData); + chunkMetaDataIterator.remove(); chunkNum--; invalidChunkNum++; } else { @@ -399,7 +414,7 @@ public class TsFileIOWriter { } } if (chunkNum == 0) { - chunkMetadataListMap.remove(path); + chunkGroupMetaDataIterator.remove(); } } }
