This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/polish_node in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f1548d18c235b7d0a9c24dec0b50075c92ea7919 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Mar 28 23:02:58 2022 +0800 add fake interface --- .../commons/partition/DataRegionReplicaSet.java | 8 ++ .../iotdb/db/mpp/common/schematree/SchemaTree.java | 40 +++++++- .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 4 +- .../mpp/sql/analyze/FakePartitionFetcherImpl.java | 107 +++++++++++++++++++++ .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 30 ++++++ .../db/mpp/sql/planner/DistributionPlanner.java | 8 +- .../planner/plan/node/process/ExchangeNode.java | 4 +- .../db/mpp/sql/plan/DistributionPlannerTest.java | 23 ++++- .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 46 +++++++++ 9 files changed, 256 insertions(+), 14 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java index 6ab4fbf..7346363 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java @@ -50,4 +50,12 @@ public class DataRegionReplicaSet { public String toString() { return String.format("%s:%s", Id, endPointList); } + + public int hashCode() { + return toString().hashCode(); + } + + public boolean equals(Object obj) { + return obj instanceof DataRegionReplicaSet && obj.toString().equals(toString()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java index 221a115..6412a6e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.common.schematree; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.tsfile.utils.Pair; @@ -27,7 +28,9 @@ import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class SchemaTree { @@ -36,12 +39,47 @@ public class SchemaTree { /** * Return all measurement paths for given path pattern and filter the result by slimit and offset. * - * @param pathPattern can be a pattern or a full path of timeseries. + * @param pathPattern can be a pattern or a full path of timeseries. * @param isPrefixMatch if true, the path pattern is used to match prefix path * @return Left: all measurement paths; Right: remaining series offset */ public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths( PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) { + // TODO: (xingtanzjr) we mock some results here to test the whole procedure + try { + String[] paths = new String[]{ + "root.sg.d1.s1", + "root.sg.d1.s2", + "root.sg.d22.s1", + "root.sg.d22.s2", + "root.sg.d333.s1", + "root.sg.d333.s2", + }; + + List<MeasurementPath> result = new ArrayList<>(); + String target = pathPattern.getFullPath(); + StringBuilder noStar = new StringBuilder(); + boolean lastCharIsStar = false; + for(int i = 0 ; i < target.length(); i ++) { + char c = target.charAt(i); + if (c == '*' || (lastCharIsStar && c == '.')) { + lastCharIsStar = c == '*'; + continue; + } + lastCharIsStar = false; + noStar.append(String.valueOf(c)); + } + + for (String path : paths) { + if (path.contains(noStar)) { + result.add(new MeasurementPath(path)); + } + } + return new Pair<>(result, 0); + + } catch (IllegalPathException e) { + e.printStackTrace(); + } return new Pair<>(new ArrayList<>(), 0); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java index 942d26d..2b905f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java @@ -60,9 +60,9 @@ public class Analyzer { private final MPPQueryContext context; // TODO need to use factory to decide standalone or cluster - private final IPartitionFetcher partitionFetcher = StandalonePartitionFetcher.getInstance(); + private final IPartitionFetcher partitionFetcher = new FakePartitionFetcherImpl(); // TODO need to use factory to decide standalone or cluster - private final ISchemaFetcher schemaFetcher = StandaloneSchemaFetcher.getInstance(); + private final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl(); public Analyzer(MPPQueryContext context) { this.context = context; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java new file mode 100644 index 0000000..ecee1a3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.sql.analyze; + +import org.apache.iotdb.commons.partition.*; +import org.apache.iotdb.service.rpc.thrift.EndPoint; + +import java.util.*; + +public class FakePartitionFetcherImpl implements IPartitionFetcher { + @Override + public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) { + return null; + } + + @Override + public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) { + String device1 = "root.sg.d1"; + String device2 = "root.sg.d22"; + String device3 = "root.sg.d333"; + + DataPartitionInfo dataPartitionInfo = new DataPartitionInfo(); + Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> + dataPartitionMap = new HashMap<>(); + Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap = + new HashMap<>(); + + List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>(); + d1DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(1), + Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000)))); + d1DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(2), + Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000)))); + Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>(); + d1DataRegionMap.put(new TimePartitionId(), d1DataRegions); + + List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>(); + d2DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(3), + Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000)))); + Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>(); + d2DataRegionMap.put(new TimePartitionId(), d2DataRegions); + + List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>(); + d3DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(1), + Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000)))); + d3DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(4), + Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000)))); + Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>(); + d3DataRegionMap.put(new TimePartitionId(), d3DataRegions); + + sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap); + sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap); + sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap); + + dataPartitionMap.put("root.sg", sgPartitionMap); + + dataPartitionInfo.setDataPartitionMap(dataPartitionMap); + + return dataPartitionInfo; + } + + @Override + public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) { + return null; + } + + @Override + public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) { + return null; + } + + @Override + public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) { + return null; + } + + @Override + public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java new file mode 100644 index 0000000..44dd06e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.sql.analyze; + +import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; +import org.apache.iotdb.db.mpp.common.schematree.SchemaTree; + +public class FakeSchemaFetcherImpl implements ISchemaFetcher{ + @Override + public SchemaTree fetchSchema(PathPatternTree patternTree) { + return new SchemaTree(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java index 79cab0f..cd26a35 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java @@ -28,11 +28,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment; import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner; import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.*; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode; @@ -75,7 +71,9 @@ public class DistributionPlanner { public DistributedQueryPlan planFragments() { PlanNode rootAfterRewrite = rewriteSource(); + System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite)); PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite); + System.out.println(PlanNodeUtil.nodeToString(rootWithExchange)); SubPlan subPlan = splitFragment(rootWithExchange); List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan); return new DistributedQueryPlan( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java index 3e016be..950e487 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java @@ -104,8 +104,8 @@ public class ExchangeNode extends PlanNode { public String toString() { return String.format( - "ExchangeNode-%s: [SourceNodeId: %s, SourceAddress:%s]", - getId(), remoteSourceNode.getId(), getSourceAddress()); + "ExchangeNode-%s: [SourceAddress:%s]", + getId(), getSourceAddress()); } public String getSourceAddress() { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java index 40f78fb..aba4a00 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java @@ -138,7 +138,7 @@ public class DistributionPlannerTest { timeJoinNode.addChild( new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2"))); timeJoinNode.addChild( - new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1"))); + new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d333.s1"))); LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode); @@ -164,15 +164,16 @@ public class DistributionPlannerTest { dataPartitionMap = new HashMap<>(); Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap = new HashMap<>(); + List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>(); d1DataRegions.add( new DataRegionReplicaSet( new DataRegionId(1), - Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000)))); + Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000)))); d1DataRegions.add( new DataRegionReplicaSet( new DataRegionId(2), - Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000)))); + Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000)))); Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>(); d1DataRegionMap.put(new TimePartitionId(), d1DataRegions); @@ -180,12 +181,26 @@ public class DistributionPlannerTest { d2DataRegions.add( new DataRegionReplicaSet( new DataRegionId(3), - Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000)))); + Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000)))); Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>(); d2DataRegionMap.put(new TimePartitionId(), d2DataRegions); + List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>(); + d3DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(1), + Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000)))); + d3DataRegions.add( + new DataRegionReplicaSet( + new DataRegionId(4), + Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000)))); + Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>(); + d3DataRegionMap.put(new TimePartitionId(), d3DataRegions); + sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap); sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap); + sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap); + dataPartitionMap.put("root.sg", sgPartitionMap); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java new file mode 100644 index 0000000..bf8580b --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.sql.plan; + +import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.common.SessionInfo; +import org.apache.iotdb.db.mpp.execution.QueryExecution; +import org.apache.iotdb.db.mpp.sql.analyze.QueryType; +import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator; +import org.apache.iotdb.db.mpp.sql.statement.Statement; +import org.junit.Test; + +import java.time.ZoneId; + +public class QueryPlannerTest { + + @Test + public void TestSqlToDistributedPlan() { + + String querySql = "SELECT d1.*, d22.s1 FROM root.sg"; + + Statement stmt = StatementGenerator.createStatement(querySql, ZoneId.systemDefault()); + System.out.println(stmt); + + QueryExecution queryExecution = new QueryExecution(stmt, new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ)); + System.out.println(queryExecution); + } +}
