This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch serialize-physicalplan
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6e2dceadb3021cd3df9f8498948f0ef32199e9a0
Author: lta <[email protected]>
AuthorDate: Wed Mar 27 22:48:24 2019 +0800

    add author and loaddata plan codec
---
 .../iotdb/db/qp/physical/sys/LoadDataPlan.java     | 19 ++++++++++++
 .../iotdb/db/writelog/transfer/CodecInstances.java | 34 +++++++++++++++++++++-
 .../db/writelog/transfer/PhysicalPlanCodec.java    |  3 +-
 .../writelog/transfer/PhysicalPlanLogTransfer.java |  5 +++-
 .../db/writelog/transfer/SystemLogOperator.java    |  1 +
 .../transfer/PhysicalPlanLogTransferTest.java      | 27 ++++++++++++++++-
 6 files changed, 85 insertions(+), 4 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
index f2421bf..7e2eb7a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -54,4 +55,22 @@ public class LoadDataPlan extends PhysicalPlan {
   public String getMeasureType() {
     return measureType;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LoadDataPlan)) {
+      return false;
+    }
+    LoadDataPlan that = (LoadDataPlan) o;
+    return Objects.equals(getInputFilePath(), that.getInputFilePath()) &&
+        Objects.equals(getMeasureType(), that.getMeasureType());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getInputFilePath(), getMeasureType());
+  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
index 6c37c17..22fc9e4 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 import org.apache.iotdb.db.utils.ByteBufferUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -312,7 +313,7 @@ public class CodecInstances {
       if (permissionListLen != -1) {
         permissions = new HashSet<>(permissionListLen);
         for(int i =0 ; i < permissionListLen; i ++) {
-           permissions.add(buffer.getInt());
+          permissions.add(buffer.getInt());
         }
       }
       AuthorPlan authorPlan = null;
@@ -327,4 +328,35 @@ public class CodecInstances {
     }
   };
 
+  static final Codec<LoadDataPlan> loadDataPlanCodec = new 
Codec<LoadDataPlan>() {
+    ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>();
+
+    @Override
+    public byte[] encode(LoadDataPlan plan) {
+      int type = SystemLogOperator.LOADDATA;
+      if (localBuffer.get() == null) {
+        localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize()));
+      }
+      ByteBuffer buffer = localBuffer.get();
+      buffer.clear();
+      buffer.put((byte) type);
+
+      ByteBufferUtils.putString(buffer, plan.getInputFilePath());
+      ByteBufferUtils.putString(buffer, plan.getMeasureType());
+
+      return Arrays.copyOfRange(buffer.array(), 0, buffer.position());
+    }
+
+    @Override
+    public LoadDataPlan decode(byte[] bytes) throws IOException {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+      buffer.get(); // read and skip an int representing "type"
+
+      String inputFilePath = ByteBufferUtils.readString(buffer);
+      String measureType = ByteBufferUtils.readString(buffer);
+      return new LoadDataPlan(inputFilePath, measureType);
+    }
+  };
+
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
index 3ce856c..008dc15 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
@@ -39,7 +39,8 @@ public enum PhysicalPlanCodec {
   UPDATEPLAN(SystemLogOperator.UPDATE, CodecInstances.updatePlanCodec),
   DELETEPLAN(SystemLogOperator.DELETE, CodecInstances.deletePlanCodec),
   METADATAPLAN(SystemLogOperator.METADATA, CodecInstances.metadataPlanCodec),
-  AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec);
+  AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec),
+  LOADDATAPLAN(SystemLogOperator.LOADDATA, CodecInstances.loadDataPlanCodec);
 
   private static final HashMap<Integer, PhysicalPlanCodec> codecMap = new 
HashMap<>();
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
index 22ef179..76d27ee 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 
 public class PhysicalPlanLogTransfer {
@@ -46,7 +47,9 @@ public class PhysicalPlanLogTransfer {
       codec = (Codec<PhysicalPlan>) 
PhysicalPlanCodec.fromOpcode(SystemLogOperator.METADATA).codec;
     } else if (plan instanceof AuthorPlan) {
       codec = (Codec<PhysicalPlan>) 
PhysicalPlanCodec.fromOpcode(SystemLogOperator.AUTHOR).codec;
-    } else {
+    } else if (plan instanceof LoadDataPlan) {
+      codec = (Codec<PhysicalPlan>) 
PhysicalPlanCodec.fromOpcode(SystemLogOperator.LOADDATA).codec;
+    } else{
       throw new UnsupportedOperationException(
           "SystemLogOperator given is not supported. " + 
plan.getOperatorType());
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
index ebc46e3..f8e9bed 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
@@ -30,4 +30,5 @@ public class SystemLogOperator {
   public static final int DELETE = 2;
   public static final int METADATA = 3;
   public static final int AUTHOR = 4;
+  public static final int LOADDATA = 5;
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
index d91496c..bb93aa7 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -44,25 +45,30 @@ public class PhysicalPlanLogTransferTest {
   private DeletePlan deletePlan = new DeletePlan(50, new 
Path("root.vehicle.device"));
   private UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
       new Path("root.vehicle.device.sensor"));
+  private LoadDataPlan loadDataPlan = new 
LoadDataPlan("/tmp/data/vehicle","sensor");
 
   @Test
   public void operatorToLog()
       throws IOException, ArgsErrorException, ProcessorException, 
QueryProcessorException {
+    /** Insert Plan test **/
     byte[] insertPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(insertPlan);
     Codec<InsertPlan> insertPlanCodec = CodecInstances.multiInsertPlanCodec;
     byte[] insertPlanProperty = insertPlanCodec.encode(insertPlan);
     assertEquals(true, Arrays.equals(insertPlanProperty, insertPlanBytesTest));
 
+    /** Delete Plan test **/
     byte[] deletePlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(deletePlan);
     Codec<DeletePlan> deletePlanCodec = CodecInstances.deletePlanCodec;
     byte[] deletePlanProperty = deletePlanCodec.encode(deletePlan);
     assertEquals(true, Arrays.equals(deletePlanProperty, deletePlanBytesTest));
 
+    /** Update Plan test **/
     byte[] updatePlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(updatePlan);
     Codec<UpdatePlan> updatePlanCodec = CodecInstances.updatePlanCodec;
     byte[] updatePlanProperty = updatePlanCodec.encode(updatePlan);
     assertEquals(true, Arrays.equals(updatePlanProperty, updatePlanBytesTest));
 
+    /** Metadata Plan test **/
     String metadataStatement = "create timeseries root.vehicle.d1.s1 with 
datatype=INT32,encoding=RLE";
     MetadataPlan metadataPlan = (MetadataPlan) 
processor.parseSQLToPhysicalPlan(metadataStatement);
     byte[] metadataPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(metadataPlan);
@@ -70,6 +76,7 @@ public class PhysicalPlanLogTransferTest {
     byte[] metadataPlanProperty = metadataPlanCodec.encode(metadataPlan);
     assertEquals(true, Arrays.equals(metadataPlanProperty, 
metadataPlanBytesTest));
 
+    /** Author Plan test **/
     String sql = "grant role xm privileges 
'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor";
     AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql);
     byte[] authorPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(authorPlan);
@@ -77,27 +84,37 @@ public class PhysicalPlanLogTransferTest {
     byte[] authorPlanProperty = authorPlanCodec.encode(authorPlan);
     assertEquals(true, Arrays.equals(authorPlanProperty, authorPlanBytesTest));
 
+    /** LoadData Plan test **/
+    byte[] loadDataPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(loadDataPlan);
+    Codec<LoadDataPlan> loadDataPlanCodec = CodecInstances.loadDataPlanCodec;
+    byte[] loadDataPlanProperty = loadDataPlanCodec.encode(loadDataPlan);
+    assertEquals(true, Arrays.equals(loadDataPlanProperty, 
loadDataPlanBytesTest));
+
   }
 
   @Test
   public void logToOperator()
       throws IOException, ArgsErrorException, ProcessorException, 
QueryProcessorException, AuthException {
 
+    /** Insert Plan test **/
     byte[] insertPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(insertPlan);
     InsertPlan insertPlanTest = (InsertPlan) PhysicalPlanLogTransfer
         .logToOperator(insertPlanBytesTest);
     assertEquals(true, insertPlanTest.equals(insertPlan));
 
+    /** Delete Plan test **/
     byte[] deletePlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(deletePlan);
     DeletePlan deletePlanTest = (DeletePlan) PhysicalPlanLogTransfer
         .logToOperator(deletePlanBytesTest);
     assertEquals(true, deletePlanTest.equals(deletePlan));
 
+    /** Update Plan test **/
     byte[] updatePlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(updatePlan);
     UpdatePlan updatePlanTest = (UpdatePlan) PhysicalPlanLogTransfer
         .logToOperator(updatePlanBytesTest);
     assertEquals(true, updatePlanTest.equals(updatePlan));
 
+    /** Metadata Plan test **/
     String metadataStatement = "create timeseries root.vehicle.d1.s1 with 
datatype=INT32,encoding=RLE";
     MetadataPlan metadataPlan = (MetadataPlan) 
processor.parseSQLToPhysicalPlan(metadataStatement);
     byte[] metadataPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(metadataPlan);
@@ -105,11 +122,19 @@ public class PhysicalPlanLogTransferTest {
         .logToOperator(metadataPlanBytesTest);
     assertEquals(true, metadataPlanTest.equals(metadataPlan));
 
+    /** Author Plan test **/
     String sql = "grant role xm privileges 
'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor";
     AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql);
     byte[] authorPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(authorPlan);
-    AuthorPlan authorPlanTest = (AuthorPlan) 
PhysicalPlanLogTransfer.logToOperator(authorPlanBytesTest);
+    AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer
+        .logToOperator(authorPlanBytesTest);
     assertEquals(true, authorPlanTest.equals(authorPlan));
 
+    /** LoadData Plan test **/
+    byte[] loadDataPlanBytesTest = 
PhysicalPlanLogTransfer.operatorToLog(loadDataPlan);
+    LoadDataPlan loadDataPlanTest = (LoadDataPlan) PhysicalPlanLogTransfer
+        .logToOperator(loadDataPlanBytesTest);
+    assertEquals(true, loadDataPlan.equals(loadDataPlanTest));
+
   }
 }
\ No newline at end of file

Reply via email to