This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch fix_concurrent_cluster_query_bug_and_align_by_device_query_locally in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d9f0eb57f1bbcb4aa7747fd8de47c55de30982b Author: LebronAl <[email protected]> AuthorDate: Thu May 13 14:58:40 2021 +0800 fix --- .../cluster/query/ClusterDataQueryExecutor.java | 9 +-------- .../cluster/query/reader/ClusterReaderFactory.java | 23 ++++------------------ .../query/reader/mult/RemoteMultSeriesReader.java | 15 +++----------- .../org/apache/iotdb/db/metadata/PartialPath.java | 15 ++++++++++++++ .../iotdb/db/query/reader/series/SeriesReader.java | 6 +++--- 5 files changed, 26 insertions(+), 42 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java index 168fc2c..8c94a66 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java @@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter; @@ -130,13 +129,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i); TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i); - String fullPath = partialPath.getFullPath(); - if (partialPath instanceof VectorPartialPath) { - VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath; - if (vectorPartialPath.getSubSensorsPathList().size() == 1) { - fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(); - } - } + String fullPath = PartialPath.getExactFullPath(partialPath); AssignPathManagedMergeReader assignPathManagedMergeReader = new AssignPathManagedMergeReader(fullPath, dataType); for (AbstractMultPointReader multPointReader : multPointReaders) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index eeee119..eb47005 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -316,7 +316,7 @@ public class ClusterReaderFactory { context, dataGroupMember, ascending); - partialPathPointReaderMap.put(partialPath.getFullPath(), seriesPointReader); + partialPathPointReaderMap.put(PartialPath.getExactFullPath(partialPath), seriesPointReader); } if (logger.isDebugEnabled()) { @@ -578,10 +578,7 @@ public class ClusterReaderFactory { Set<String> fullPaths = Sets.newHashSet(); dataSourceInfo .getPartialPaths() - .forEach( - partialPath -> { - fullPaths.add(partialPath.getFullPath()); - }); + .forEach(partialPath -> fullPaths.add(partialPath.getFullPath())); return new MultEmptyReader(fullPaths); } throw new StorageEngineException( @@ -1004,20 +1001,8 @@ public class ClusterReaderFactory { context, dataGroupMember.getHeader(), ascending); - if (partialPath instanceof VectorPartialPath) { - VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath; - if (vectorPartialPath.getSubSensorsPathList().size() == 1) { - partialPathBatchReaderMap.put( - vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(), - new SeriesRawDataBatchReader(seriesReader)); - } else { - partialPathBatchReaderMap.put( - partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader)); - } - } else { // common path - partialPathBatchReaderMap.put( - partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader)); - } + partialPathBatchReaderMap.put( + PartialPath.getExactFullPath(partialPath), new SeriesRawDataBatchReader(seriesReader)); } return new MultBatchReader(partialPathBatchReaderMap); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java index 8b9a61a..d608d7e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java @@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -75,22 +74,14 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader { this.cachedBatchs = Maps.newHashMap(); this.pathToDataType = Maps.newHashMap(); for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) { - - PartialPath partialPath = sourceInfo.getPartialPaths().get(i); - String fullPath = partialPath.getFullPath(); - if (partialPath instanceof VectorPartialPath) { - VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath; - if (vectorPartialPath.getSubSensorsPathList().size() == 1) { - fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(); - } - } + String fullPath = PartialPath.getExactFullPath(sourceInfo.getPartialPaths().get(i)); this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>()); this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i)); } } @Override - public boolean hasNextTimeValuePair(String fullPath) throws IOException { + public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException { BatchData batchData = currentBatchDatas.get(fullPath); if (batchData != null && batchData.hasCurrent()) { return true; @@ -108,7 +99,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader { } @Override - public TimeValuePair nextTimeValuePair(String fullPath) throws IOException { + public synchronized TimeValuePair nextTimeValuePair(String fullPath) throws IOException { BatchData batchData = currentBatchDatas.get(fullPath); if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) { batchData = cachedBatchs.get(fullPath).poll(); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java index 79d075f..54e4362 100755 --- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java @@ -312,4 +312,19 @@ public class PartialPath extends Path implements Comparable<Path> { } return ret; } + + /** + * If the partialPath is VectorPartialPath and it has only one sub sensor, return the sub sensor's + * full path. Otherwise, return the partialPath's fullPath + */ + public static String getExactFullPath(PartialPath partialPath) { + String fullPath = partialPath.getFullPath(); + if (partialPath instanceof VectorPartialPath) { + VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath; + if (vectorPartialPath.getSubSensorsPathList().size() == 1) { + fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(); + } + } + return fullPath; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 3181d10..f6ac803 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -208,7 +208,7 @@ public class SeriesReader { || firstPageReader != null || mergeReader.hasNextTimeValuePair()) { throw new IOException( - "all cached pages should be consumed first cachedPageReaders.isEmpty() is " + "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " + unSeqPageReaders.isEmpty() + " firstPageReader != null is " + (firstPageReader != null) @@ -269,7 +269,7 @@ public class SeriesReader { || firstPageReader != null || mergeReader.hasNextTimeValuePair()) { throw new IOException( - "all cached pages should be consumed first cachedPageReaders.isEmpty() is " + "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " + unSeqPageReaders.isEmpty() + " firstPageReader != null is " + (firstPageReader != null) @@ -430,7 +430,7 @@ public class SeriesReader { return true; } - // make sure firstPageReader won't be null while the cachedPageReaders has more cached page + // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page // readers while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
