This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit db3eab0588888c61626efe9faac5f743b66813ec Author: liuminghui233 <[email protected]> AuthorDate: Mon Nov 14 20:31:48 2022 +0800 support shuffle --- client-py/SessionExample.py | 2 +- .../db/mpp/plan/planner/LogicalPlanVisitor.java | 12 ++++--- .../service/thrift/impl/ClientRPCServiceImpl.java | 8 +++-- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 38 +++++++++++++++++----- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py index 4f04855def..e50521ad18 100644 --- a/client-py/SessionExample.py +++ b/client-py/SessionExample.py @@ -45,7 +45,7 @@ fetch_args = { "end_time": 32, "interval": 4, "sliding_step": 1, - "indexes": [0, 3, 5, 9] + "indexes": [9, 0, 5, 3] } print(session.fetch_window_batch(ts_path_list, None, fetch_args)) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index 3d3c02b2e8..14496b69cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -74,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; /** * This visitor is used to generate a logical plan for the statement and returns the {@link @@ -294,16 +295,17 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte public PlanNode visitFetchWindowBatch( FetchWindowBatchStatement fetchWindowBatchStatement, MPPQueryContext context) { LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); + List<Integer> sortedSamplingIndexes = + fetchWindowBatchStatement.getSamplingIndexes().stream() + .sorted() + .collect(Collectors.toList()); planBuilder .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null) .planTransform( analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC) - .planWindowSplit( - fetchWindowBatchStatement.getGroupByTimeParameter(), - fetchWindowBatchStatement.getSamplingIndexes()) + .planWindowSplit(fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes) .planWindowConcat( - fetchWindowBatchStatement.getGroupByTimeParameter(), - fetchWindowBatchStatement.getSamplingIndexes()); + fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes); return planBuilder.getRoot(); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index ab027fb138..814fd22deb 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; +import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement; @@ -459,7 +460,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchWindowBatchResp resp = createTSFetchWindowBatchResp(queryExecution.getDatasetHeader()); resp.setWindowBatchDataSetList( - QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(queryExecution)); + QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList( + queryExecution, ((FetchWindowBatchStatement) s).getSamplingIndexes())); return resp; } } catch (Exception e) { @@ -514,7 +516,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { TSFetchWindowBatchResp resp = createTSFetchWindowBatchResp(queryExecution.getDatasetHeader()); - resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution)); + resp.setWindowBatch( + QueryDataSetUtils.convertTsBlocksToWindowBatch( + queryExecution, ((FetchWindowBatchStatement) s).getSamplingIndexes())); return resp; } } catch (Exception e) { diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 8880ca2810..b0d26a7b0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; /** TimeValuePairUtils to convert between thrift format and TsFile format. */ public class QueryDataSetUtils { @@ -400,9 +401,9 @@ public class QueryDataSetUtils { return res; } - public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(IQueryExecution queryExecution) - throws IoTDBException { - List<List<ByteBuffer>> windowSet = new ArrayList<>(); + public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch( + IQueryExecution queryExecution, List<Integer> samplingIndexes) throws IoTDBException { + List<List<ByteBuffer>> sortedWindowBatch = new ArrayList<>(); while (true) { Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); @@ -423,14 +424,24 @@ public class QueryDataSetUtils { res.add(byteBuffer); } - windowSet.add(res); + sortedWindowBatch.add(res); } - return windowSet; + + List<List<ByteBuffer>> windowBatch = new ArrayList<>(sortedWindowBatch.size()); + List<Integer> sortedSamplingIndexes = + samplingIndexes.stream().sorted().collect(Collectors.toList()); + + for (Integer samplingIndex : samplingIndexes) { + int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex); + windowBatch.add(sortedWindowBatch.get(mapIndex)); + } + return windowBatch; } public static List<TSQueryDataSet> convertTsBlocksToWindowBatchDataSetList( - IQueryExecution queryExecution) throws IoTDBException, IOException { - List<TSQueryDataSet> windowSet = new ArrayList<>(); + IQueryExecution queryExecution, List<Integer> samplingIndexes) + throws IoTDBException, IOException { + List<TSQueryDataSet> sortedWindowBatch = new ArrayList<>(); int columnNum = queryExecution.getOutputValueColumnCount(); // one time column and each value column has an actual value buffer and a bitmap value to @@ -626,9 +637,18 @@ public class QueryDataSetUtils { tsQueryDataSet.setBitmapList(bitmapList); tsQueryDataSet.setValueList(valueList); - windowSet.add(tsQueryDataSet); + sortedWindowBatch.add(tsQueryDataSet); + } + + List<TSQueryDataSet> windowBatch = new ArrayList<>(sortedWindowBatch.size()); + List<Integer> sortedSamplingIndexes = + samplingIndexes.stream().sorted().collect(Collectors.toList()); + + for (Integer samplingIndex : samplingIndexes) { + int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex); + windowBatch.add(sortedWindowBatch.get(mapIndex)); } - return windowSet; + return windowBatch; } public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
