This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new fc5ce31 refactor the interaction with MManager while executing
AlignByDevicePlan (#4359)
fc5ce31 is described below
commit fc5ce315498f4cf3906cb07b5b30d847365b7ac4
Author: liuminghui233 <[email protected]>
AuthorDate: Thu Nov 11 19:15:27 2021 +0800
refactor the interaction with MManager while executing AlignByDevicePlan
(#4359)
---
.../db/query/dataset/AlignByDeviceDataSet.java | 47 +++++-----------------
1 file changed, 10 insertions(+), 37 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index d2ee07a..0486dfe 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -42,13 +41,10 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -135,13 +131,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
while (deviceIterator.hasNext()) {
currentDevice = deviceIterator.next();
// get all measurements of current device
- Set<String> measurementOfGivenDevice =
getMeasurementsUnderGivenDevice(currentDevice);
+ Map<String, MeasurementPath> measurementToPathMap =
+ getMeasurementsUnderGivenDevice(currentDevice);
+ Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
// extract paths and aggregations queried from all measurements
// executeColumns is for calculating rowRecord
executeColumns = new ArrayList<>();
List<PartialPath> executePaths = new ArrayList<>();
- List<TSDataType> tsDataTypes = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
for (Entry<String, MeasurementInfo> entry :
measurementInfoMap.entrySet()) {
if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
@@ -157,8 +154,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
if (measurementOfGivenDevice.contains(measurement)) {
executeColumns.add(column);
- executePaths.add(transformPath(currentDevice, measurement));
-
tsDataTypes.add(measurementInfoMap.get(column).getMeasurementDataType());
+ executePaths.add(measurementToPathMap.get(measurement));
}
}
@@ -220,41 +216,18 @@ public class AlignByDeviceDataSet extends QueryDataSet {
return false;
}
- /**
- * Get all measurements under given device. For a vectorMeasurementSchema,
we return its
- * measurementId + all subMeasurement. e.g. schema: vector1[s1, s2], return
["vector1.s1",
- * "vector1.s2"].
- */
- protected Set<String> getMeasurementsUnderGivenDevice(PartialPath device)
throws IOException {
+ /** Get all measurements under given device. */
+ protected Map<String, MeasurementPath>
getMeasurementsUnderGivenDevice(PartialPath device)
+ throws IOException {
try {
- Set<String> res = new HashSet<>();
// TODO: Implement this method in Cluster MManager
+ Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
List<MeasurementPath> measurementPaths =
IoTDB.metaManager.getAllMeasurementByDevicePath(device);
for (MeasurementPath measurementPath : measurementPaths) {
- res.add(measurementPath.getMeasurement());
- }
-
- return res;
- } catch (MetadataException e) {
- throw new IOException("Cannot get node from " + device, e);
- }
- }
-
- /**
- * Attention. For a vectorPath(root.sg.d1.vector1.s1), device is root.sg.d1,
measurement is
- * "vector1.s1".
- */
- private PartialPath transformPath(PartialPath device, String measurement)
throws IOException {
- try {
- PartialPath fullPath = new PartialPath(device.getFullPath(),
measurement);
- IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath);
- if (schema instanceof UnaryMeasurementSchema) {
- return new MeasurementPath(device.getFullPath(), measurement, schema);
- } else {
- String vectorPath = fullPath.getDevice();
- return new AlignedPath(vectorPath, fullPath.getMeasurement());
+ measurementToPathMap.put(measurementPath.getMeasurement(),
measurementPath);
}
+ return measurementToPathMap;
} catch (MetadataException e) {
throw new IOException("Cannot get node from " + device, e);
}