This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FIDig
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/FIDig by this push:
new 2fe93289c2 FI DIG
2fe93289c2 is described below
commit 2fe93289c2f26d36ab61b8a3c09593c40ca1d696
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Apr 11 10:14:40 2023 +0800
FI DIG
---
.../src/main/java/org/apache/iotdb/QueryTest.java | 59 ++++++++++++++++++++++
.../db/mpp/plan/execution/QueryExecution.java | 7 +++
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 4 ++
.../db/mpp/plan/planner/plan/FragmentInstance.java | 7 ++-
.../db/mpp/plan/planner/plan/PlanFragment.java | 10 ++++
.../planner/plan/node/process/DeviceViewNode.java | 16 ++++++
.../planner/plan/node/sink/IdentitySinkNode.java | 9 ++++
.../db/mpp/plan/scheduler/ClusterScheduler.java | 1 +
.../scheduler/FragmentInstanceDispatcherImpl.java | 3 ++
.../service/thrift/impl/ClientRPCServiceImpl.java | 2 +
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +
.../plan/node/sink/IdentitySinkNodeSerdeTest.java | 4 +-
...st.java => AlignedSeriesScanNodeSerdeTest.java} | 30 ++++++-----
.../plan/node/source/SeriesScanNodeSerdeTest.java | 2 +
14 files changed, 142 insertions(+), 14 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/QueryTest.java
b/example/session/src/main/java/org/apache/iotdb/QueryTest.java
new file mode 100644
index 0000000000..b30c30078b
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/QueryTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+public class QueryTest {
+
+ private static Session session;
+ private static Session sessionEnableRedirect;
+
+ private static final String HOST = "192.168.130.9";
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException {
+ session =
+ new Session.Builder()
+ .host(HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_1_0)
+ .build();
+ session.open(false);
+
+ // set session fetchSize
+ session.setFetchSize(10000);
+ long startTime = System.currentTimeMillis();
+ String querySql =
+ "select * from root.** where time >= 2022-01-01T00:00:01.000+08:00 and
time <= 2022-01-01T00:01:00.000+08:00 align by device";
+ try (SessionDataSet dataSet = session.executeQueryStatement(querySql)) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ }
+ }
+ System.out.printf("Cost: %d", System.currentTimeMillis() - startTime);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d6e87610ab..b592eb2467 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -155,7 +155,9 @@ public class QueryExecution implements IQueryExecution {
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
+ long startTime = System.currentTimeMillis();
this.analysis = analyze(statement, context, partitionFetcher,
schemaFetcher);
+ logger.info("Cost of Analyze is: {}ms", System.currentTimeMillis() -
startTime);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
@@ -205,8 +207,13 @@ public class QueryExecution implements IQueryExecution {
// check timeout for query first
checkTimeOutForQuery();
+ long currentTime;
+ currentTime = System.currentTimeMillis();
doLogicalPlan();
+ logger.info("Cost of doLogicalPlan is: {}ms", System.currentTimeMillis() -
currentTime);
+ currentTime = System.currentTimeMillis();
doDistributedPlan();
+ logger.info("Cost of doDistributedPlan is: {}ms",
System.currentTimeMillis() - currentTime);
// update timeout after finishing plan stage
context.setTimeOut(
context.getTimeOut() - (System.currentTimeMillis() -
context.getStartTime()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 30a982bc74..cf4f34a03e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -62,7 +62,11 @@ public class LocalExecutionPlanner {
// Generate pipelines, return the last pipeline data structure
// TODO Replace operator with operatorFactory to build multiple driver for
one pipeline
+ long currentTime;
+ currentTime = System.currentTimeMillis();
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ LOGGER.info(
+ "Cost of OperatorTree construction is: {}ms",
System.currentTimeMillis() - currentTime);
// check whether current free memory is enough to execute current query
long estimatedMemorySize = checkMemory(root,
instanceContext.getStateMachine());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 97bd7e1748..52c61078ac 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -46,7 +46,7 @@ import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
- private final Logger logger =
LoggerFactory.getLogger(FragmentInstance.class);
+ private static final Logger logger =
LoggerFactory.getLogger(FragmentInstance.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -170,6 +170,8 @@ public class FragmentInstance implements IConsensusRequest {
}
public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
+ logger.info("Size of FI bytebuffer is: {}B", buffer.capacity());
+ long startTime = System.currentTimeMillis();
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
PlanFragment planFragment = PlanFragment.deserialize(buffer);
long timeOut = ReadWriteIOUtils.readLong(buffer);
@@ -183,12 +185,14 @@ public class FragmentInstance implements
IConsensusRequest {
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ?
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
+ logger.info("Cost of deserialize FI is: {}ms", System.currentTimeMillis()
- startTime);
return fragmentInstance;
}
public ByteBuffer serializeToByteBuffer() {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+ long startTime = System.currentTimeMillis();
id.serialize(outputStream);
fragment.serialize(outputStream);
ReadWriteIOUtils.write(timeOut, outputStream);
@@ -205,6 +209,7 @@ public class FragmentInstance implements IConsensusRequest {
if (hostDataNode != null) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode,
outputStream);
}
+ logger.info("cost of Serializing a FI is: {}ms",
System.currentTimeMillis() - startTime);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
logger.error("Unexpected error occurs when serializing this
FragmentInstance.", e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index 5fd8b79a7a..0ba832e1c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -30,6 +30,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -47,6 +50,8 @@ public class PlanFragment {
// indicate whether this PlanFragment is the root of the whole
Fragment-Plan-Tree or not
private boolean isRoot;
+ private final Logger logger = LoggerFactory.getLogger(PlanFragment.class);
+
public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
this.id = id;
this.planNodeTree = planNodeTree;
@@ -145,13 +150,18 @@ public class PlanFragment {
public void serialize(DataOutputStream stream) throws IOException {
id.serialize(stream);
+ long bytes = stream.size();
planNodeTree.serialize(stream);
+ logger.info("Size of planNodeTree in PlanFragment is: {}", stream.size() -
bytes);
+ bytes = stream.size();
+
if (typeProvider == null) {
ReadWriteIOUtils.write((byte) 0, stream);
} else {
ReadWriteIOUtils.write((byte) 1, stream);
typeProvider.serialize(stream);
}
+ logger.info("Size of TypeProvider in PlanFragment is: {}", stream.size() -
bytes);
}
public static PlanFragment deserialize(ByteBuffer byteBuffer) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 8517904d21..877a4925af 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -25,6 +25,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -58,6 +61,8 @@ public class DeviceViewNode extends MultiChildProcessNode {
// not 0 because device is the first column
private final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DeviceViewNode.class);
+
public DeviceViewNode(
PlanNodeId id,
OrderByParameter mergeOrderParameter,
@@ -143,17 +148,26 @@ public class DeviceViewNode extends MultiChildProcessNode
{
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ int initPos = stream.size();
+ int pos = stream.size();
PlanNodeType.DEVICE_VIEW.serialize(stream);
mergeOrderParameter.serializeAttributes(stream);
ReadWriteIOUtils.write(outputColumnNames.size(), stream);
+ LOGGER.info("Size of outputColumnNamesList is : {}",
outputColumnNames.size());
for (String column : outputColumnNames) {
ReadWriteIOUtils.write(column, stream);
}
+ LOGGER.info("Size of outputColumnNames is : {}", stream.size() - pos);
+ pos = stream.size();
ReadWriteIOUtils.write(devices.size(), stream);
+ LOGGER.info("Size of deviceList is : {}", devices.size());
for (String deviceName : devices) {
ReadWriteIOUtils.write(deviceName, stream);
}
+ LOGGER.info("Size of devices is : {}", stream.size() - pos);
ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), stream);
+ pos = stream.size();
+ LOGGER.info("deviceToMeasurementIndexMap.size() is : {}",
deviceToMeasurementIndexesMap.size());
for (Map.Entry<String, List<Integer>> entry :
deviceToMeasurementIndexesMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue().size(), stream);
@@ -161,6 +175,8 @@ public class DeviceViewNode extends MultiChildProcessNode {
ReadWriteIOUtils.write(index, stream);
}
}
+ LOGGER.info("Size of deviceToMeasurementIndexMap is : {}", stream.size() -
pos);
+ LOGGER.info("Size of DeviceViewNode attributes is : {}", stream.size() -
initPos);
}
public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
index 30f0cab31c..1b8221a980 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -26,6 +26,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,6 +38,8 @@ import java.util.stream.Collectors;
public class IdentitySinkNode extends MultiChildrenSinkNode {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IdentitySinkNode.class);
+
public IdentitySinkNode(PlanNodeId id) {
super(id);
}
@@ -81,14 +86,18 @@ public class IdentitySinkNode extends MultiChildrenSinkNode
{
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.IDENTITY_SINK.serialize(stream);
+ int pos = stream.size();
+ LOGGER.info("DownStreamLocationList.size() is {}",
downStreamChannelLocationList.size());
ReadWriteIOUtils.write(downStreamChannelLocationList.size(), stream);
for (DownStreamChannelLocation downStreamChannelLocation :
downStreamChannelLocationList) {
downStreamChannelLocation.serialize(stream);
}
+ LOGGER.info("Size of IdentitySinkNode attributes is : {}", stream.size() -
pos);
}
public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
int size = ReadWriteIOUtils.readInt(byteBuffer);
+ LOGGER.info("DownStreamLocationList.size() is {}", size);
List<DownStreamChannelLocation> downStreamChannelLocationList = new
ArrayList<>();
for (int i = 0; i < size; i++) {
downStreamChannelLocationList.add(DownStreamChannelLocation.deserialize(byteBuffer));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index d704726266..8487b644c3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -119,6 +119,7 @@ public class ClusterScheduler implements IScheduler {
// So we need to start the state fetcher after the dispatching stage.
try {
FragInstanceDispatchResult result = dispatchResultFuture.get();
+ logger.info("Dispatch cost is: {}ms", (System.nanoTime() - startTime) /
1000000);
if (!result.isSuccessful()) {
if (needRetry(result.getFailureStatus())) {
stateMachine.transitionToPendingRetry(result.getFailureStatus());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index b990e0e4fb..2c9753d1d7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -256,8 +256,11 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
sendFragmentInstanceReq.setConsensusGroupId(
instance.getRegionReplicaSet().getRegionId());
}
+ long startTime = System.currentTimeMillis();
TSendFragmentInstanceResp sendFragmentInstanceResp =
client.sendFragmentInstance(sendFragmentInstanceReq);
+ logger.info(
+ "Cost of sendFI in dispatchRemote is : {}ms",
System.currentTimeMillis() - startTime);
if (!sendFragmentInstanceResp.accepted) {
logger.warn(sendFragmentInstanceResp.message);
throw new FragmentInstanceDispatchException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 6c1f310195..be89e81e87 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -200,7 +200,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long startTime = System.currentTimeMillis();
StatementType statementType = null;
try {
+ long start = System.currentTimeMillis();
Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
+ LOGGER.info("Cost of createStatement is: {}ms",
System.currentTimeMillis() - start);
if (s == null) {
return RpcUtils.getTSExecuteStatementResp(
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c50c74e841..43f3c0f0f1 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -244,6 +244,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSendFragmentInstanceResp
sendFragmentInstance(TSendFragmentInstanceReq req) {
LOGGER.debug("receive FragmentInstance to group[{}]",
req.getConsensusGroupId());
+ long startTime = System.currentTimeMillis();
// deserialize ConsensusGroupId
ConsensusGroupId groupId = null;
@@ -278,6 +279,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
+ LOGGER.info("Cost of sendFI is: {}ms", System.currentTimeMillis() -
startTime);
// TODO
return resp;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
index da3882dc30..6d985d3aba 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
@@ -40,7 +40,9 @@ public class IdentitySinkNodeSerdeTest {
public void testSerializeAndDeserialize() throws IllegalPathException {
DownStreamChannelLocation downStreamChannelLocation =
new DownStreamChannelLocation(
- new TEndPoint("test", 1), new TFragmentInstanceId("test", 1,
"test"), "test");
+ new TEndPoint("192.168.130.9", 6667),
+ new TFragmentInstanceId("20230410_061855_00000_2", 1, "test"),
+ "300000");
IdentitySinkNode identitySinkNode1 =
new IdentitySinkNode(
new PlanNodeId("testIdentitySinkNode"),
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/AlignedSeriesScanNodeSerdeTest.java
similarity index 65%
copy from
server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
copy to
server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/AlignedSeriesScanNodeSerdeTest.java
index 8b40e80cee..0c5577191c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/AlignedSeriesScanNodeSerdeTest.java
@@ -16,32 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.plan.plan.node.source;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
-import static org.junit.Assert.assertEquals;
-
-public class SeriesScanNodeSerdeTest {
-
+public class AlignedSeriesScanNodeSerdeTest {
@Test
public void testSerializeAndDeserialize() throws QueryProcessException,
IllegalPathException {
- SeriesScanNode seriesScanNode =
- new SeriesScanNode(
- new PlanNodeId("TestSeriesScanNode"),
- new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+ AlignedSeriesScanNode seriesScanNode =
+ new AlignedSeriesScanNode(
+ new PlanNodeId("3000000"),
+ new AlignedPath(
+ "root.iot.DC004HP1MCY01M0008221075DB_999140",
+ Arrays.asList("q", "v"),
+ Arrays.asList(
+ new MeasurementSchema("q", TSDataType.INT32),
+ new MeasurementSchema("v", TSDataType.INT32))),
Ordering.DESC,
new GroupByFilter(1, 2, 3, 4),
null,
@@ -50,8 +54,10 @@ public class SeriesScanNodeSerdeTest {
null);
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+ System.out.println(byteBuffer.position());
seriesScanNode.serialize(byteBuffer);
+ System.out.println(byteBuffer.position());
byteBuffer.flip();
- assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer),
seriesScanNode);
+ // assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer),
seriesScanNode);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
index 8b40e80cee..8079600354 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
@@ -50,7 +50,9 @@ public class SeriesScanNodeSerdeTest {
null);
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+ System.out.println(byteBuffer.position());
seriesScanNode.serialize(byteBuffer);
+ System.out.println(byteBuffer.position());
byteBuffer.flip();
assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer),
seriesScanNode);
}