This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch improve_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/improve_wal by this push:
new e2a379a fix issues
e2a379a is described below
commit e2a379a69184ff5cb7ac3f38955274c9781e71be
Author: jt <[email protected]>
AuthorDate: Mon Oct 12 16:14:03 2020 +0800
fix issues
---
.../java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java | 14 ++++++--------
.../apache/iotdb/db/qp/physical/crud/InsertRowPlan.java | 3 +--
.../apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java | 8 ++++----
.../iotdb/db/writelog/node/DifferentialWriteLogNode.java | 5 +++--
4 files changed, 14 insertions(+), 16 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 594c195..499898d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -138,7 +138,7 @@ public abstract class PhysicalPlan {
*
* @param buffer
*/
- public void serialize(ByteBuffer buffer, PhysicalPlan base, int baseIndex) {
+ public void serialize(ByteBuffer buffer, PhysicalPlan base) {
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}
@@ -305,31 +305,29 @@ public abstract class PhysicalPlan {
public static PhysicalPlan create(ByteBuffer buffer,
Queue<PhysicalPlan> planWindow) throws IOException,
IllegalPathException {
+ int baseIndex = buffer.getInt();
int typeNum = buffer.get();
if (typeNum >= PhysicalPlanType.values().length) {
throw new IOException("unrecognized log type " + typeNum);
}
PhysicalPlanType type = PhysicalPlanType.values()[typeNum];
PhysicalPlan plan;
- int index;
switch (type) {
case INSERT:
plan = new InsertRowPlan();
- index = buffer.getInt();
- if (index < 0) {
+ if (baseIndex < 0) {
plan.deserialize(buffer);
} else {
- InsertRowPlan baseInsertRowPlan = (InsertRowPlan)
getPlan(planWindow, index);
+ InsertRowPlan baseInsertRowPlan = (InsertRowPlan)
getPlan(planWindow, baseIndex);
plan.deserialize(buffer, baseInsertRowPlan);
}
break;
case BATCHINSERT:
plan = new InsertTabletPlan();
- index = buffer.getInt();
- if (index < 0) {
+ if (baseIndex < 0) {
plan.deserialize(buffer);
} else {
- InsertTabletPlan baseInsertTabletPlan = (InsertTabletPlan)
getPlan(planWindow, index);
+ InsertTabletPlan baseInsertTabletPlan = (InsertTabletPlan)
getPlan(planWindow, baseIndex);
plan.deserialize(buffer, baseInsertTabletPlan);
}
break;
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 5b719ce..d060c12 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -414,11 +414,10 @@ public class InsertRowPlan extends InsertPlan {
}
@Override
- public void serialize(ByteBuffer buffer, PhysicalPlan base, int baseIndex) {
+ public void serialize(ByteBuffer buffer, PhysicalPlan base) {
InsertRowPlan baseInsertRowPlan = (InsertRowPlan) base;
int type = PhysicalPlanType.INSERT.ordinal();
buffer.put((byte) type);
- buffer.putInt(baseIndex);
putDiffTime(this.getTime(), baseInsertRowPlan.getTime(), buffer);
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 5c824ff..ef1fd77 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -186,10 +186,9 @@ public class InsertTabletPlan extends InsertPlan {
}
@Override
- public void serialize(ByteBuffer buffer, PhysicalPlan base, int baseIndex) {
+ public void serialize(ByteBuffer buffer, PhysicalPlan base) {
int type = PhysicalPlanType.BATCHINSERT.ordinal();
buffer.put((byte) type);
- buffer.putInt(baseIndex);
buffer
.putInt(this.getMeasurements().length - (this.getFailedMeasurements()
== null ? 0 :
@@ -416,11 +415,12 @@ public class InsertTabletPlan extends InsertPlan {
int timeSize = buffer.getInt();
byte[] bytes = new byte[timeSize];
buffer.get(bytes);
+ ByteBuffer tBuffer = ByteBuffer.wrap(bytes);
int i = 0;
try {
- while (defaultTimeDecoder.hasNext(buffer) && i < number) {
- times[i++] = defaultTimeDecoder.readLong(buffer);
+ while (defaultTimeDecoder.hasNext(tBuffer) && i < number) {
+ times[i++] = defaultTimeDecoder.readLong(tBuffer);
}
} catch (IOException e) {
logger.error("Cannot decode time of {}", this);
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/node/DifferentialWriteLogNode.java
b/server/src/main/java/org/apache/iotdb/db/writelog/node/DifferentialWriteLogNode.java
index 280181f..6b47cea 100644
---
a/server/src/main/java/org/apache/iotdb/db/writelog/node/DifferentialWriteLogNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/writelog/node/DifferentialWriteLogNode.java
@@ -78,7 +78,7 @@ public class DifferentialWriteLogNode extends
ExclusiveWriteLogNode {
private void serialize(PhysicalPlan plan, Pair<PhysicalPlan, Integer>
similarPlanIndex) {
if (similarPlanIndex == null) {
- plan.serialize(logBuffer);
+ serializeNonDifferentially(plan);
} else {
serializeDifferentially(plan, similarPlanIndex);
}
@@ -100,7 +100,8 @@ public class DifferentialWriteLogNode extends
ExclusiveWriteLogNode {
}
private void serializeDifferentially(InsertPlan plan, InsertPlan base, int
index) {
- plan.serialize(logBuffer, base, index);
+ logBuffer.putInt(index);
+ plan.serialize(logBuffer, base);
}
private Pair<PhysicalPlan, Integer> findSimilarPlan(PhysicalPlan plan) {