This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch serialize-physicalplan in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6e2dceadb3021cd3df9f8498948f0ef32199e9a0 Author: lta <[email protected]> AuthorDate: Wed Mar 27 22:48:24 2019 +0800 add author and loaddata plan codec --- .../iotdb/db/qp/physical/sys/LoadDataPlan.java | 19 ++++++++++++ .../iotdb/db/writelog/transfer/CodecInstances.java | 34 +++++++++++++++++++++- .../db/writelog/transfer/PhysicalPlanCodec.java | 3 +- .../writelog/transfer/PhysicalPlanLogTransfer.java | 5 +++- .../db/writelog/transfer/SystemLogOperator.java | 1 + .../transfer/PhysicalPlanLogTransferTest.java | 27 ++++++++++++++++- 6 files changed, 85 insertions(+), 4 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java index f2421bf..7e2eb7a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.read.common.Path; @@ -54,4 +55,22 @@ public class LoadDataPlan extends PhysicalPlan { public String getMeasureType() { return measureType; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LoadDataPlan)) { + return false; + } + LoadDataPlan that = (LoadDataPlan) o; + return Objects.equals(getInputFilePath(), that.getInputFilePath()) && + Objects.equals(getMeasureType(), that.getMeasureType()); + } + + @Override + public int hashCode() { + return Objects.hash(getInputFilePath(), getMeasureType()); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java index 6c37c17..22fc9e4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; import org.apache.iotdb.db.utils.ByteBufferUtils; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -312,7 +313,7 @@ public class CodecInstances { if (permissionListLen != -1) { permissions = new HashSet<>(permissionListLen); for(int i =0 ; i < permissionListLen; i ++) { - permissions.add(buffer.getInt()); + permissions.add(buffer.getInt()); } } AuthorPlan authorPlan = null; @@ -327,4 +328,35 @@ public class CodecInstances { } }; + static final Codec<LoadDataPlan> loadDataPlanCodec = new Codec<LoadDataPlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(LoadDataPlan plan) { + int type = SystemLogOperator.LOADDATA; + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + + ByteBufferUtils.putString(buffer, plan.getInputFilePath()); + ByteBufferUtils.putString(buffer, plan.getMeasureType()); + + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public LoadDataPlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + buffer.get(); // read and skip an int representing "type" + + String inputFilePath = ByteBufferUtils.readString(buffer); + String measureType = ByteBufferUtils.readString(buffer); + return new LoadDataPlan(inputFilePath, measureType); + } + }; + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java index 3ce856c..008dc15 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java @@ -39,7 +39,8 @@ public enum PhysicalPlanCodec { UPDATEPLAN(SystemLogOperator.UPDATE, CodecInstances.updatePlanCodec), DELETEPLAN(SystemLogOperator.DELETE, CodecInstances.deletePlanCodec), METADATAPLAN(SystemLogOperator.METADATA, CodecInstances.metadataPlanCodec), - AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec); + AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec), + LOADDATAPLAN(SystemLogOperator.LOADDATA, CodecInstances.loadDataPlanCodec); private static final HashMap<Integer, PhysicalPlanCodec> codecMap = new HashMap<>(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java index 22ef179..76d27ee 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; public class PhysicalPlanLogTransfer { @@ -46,7 +47,9 @@ public class PhysicalPlanLogTransfer { codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.METADATA).codec; } else if (plan instanceof AuthorPlan) { codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.AUTHOR).codec; - } else { + } else if (plan instanceof LoadDataPlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.LOADDATA).codec; + } else{ throw new UnsupportedOperationException( "SystemLogOperator given is not supported. " + plan.getOperatorType()); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java index ebc46e3..f8e9bed 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java @@ -30,4 +30,5 @@ public class SystemLogOperator { public static final int DELETE = 2; public static final int METADATA = 3; public static final int AUTHOR = 4; + public static final int LOADDATA = 5; } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java index d91496c..bb93aa7 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; import org.apache.iotdb.db.qp.utils.MemIntQpExecutor; import org.apache.iotdb.tsfile.read.common.Path; @@ -44,25 +45,30 @@ public class PhysicalPlanLogTransferTest { private DeletePlan deletePlan = new DeletePlan(50, new Path("root.vehicle.device")); private UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.vehicle.device.sensor")); + private LoadDataPlan loadDataPlan = new LoadDataPlan("/tmp/data/vehicle","sensor"); @Test public void operatorToLog() throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException { + /** Insert Plan test **/ byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan); Codec<InsertPlan> insertPlanCodec = CodecInstances.multiInsertPlanCodec; byte[] insertPlanProperty = insertPlanCodec.encode(insertPlan); assertEquals(true, Arrays.equals(insertPlanProperty, insertPlanBytesTest)); + /** Delete Plan test **/ byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan); Codec<DeletePlan> deletePlanCodec = CodecInstances.deletePlanCodec; byte[] deletePlanProperty = deletePlanCodec.encode(deletePlan); assertEquals(true, Arrays.equals(deletePlanProperty, deletePlanBytesTest)); + /** Update Plan test **/ byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan); Codec<UpdatePlan> updatePlanCodec = CodecInstances.updatePlanCodec; byte[] updatePlanProperty = updatePlanCodec.encode(updatePlan); assertEquals(true, Arrays.equals(updatePlanProperty, updatePlanBytesTest)); + /** Metadata Plan test **/ String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE"; MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement); byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan); @@ -70,6 +76,7 @@ public class PhysicalPlanLogTransferTest { byte[] metadataPlanProperty = metadataPlanCodec.encode(metadataPlan); assertEquals(true, Arrays.equals(metadataPlanProperty, metadataPlanBytesTest)); + /** Author Plan test **/ String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor"; AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql); byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan); @@ -77,27 +84,37 @@ public class PhysicalPlanLogTransferTest { byte[] authorPlanProperty = authorPlanCodec.encode(authorPlan); assertEquals(true, Arrays.equals(authorPlanProperty, authorPlanBytesTest)); + /** LoadData Plan test **/ + byte[] loadDataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(loadDataPlan); + Codec<LoadDataPlan> loadDataPlanCodec = CodecInstances.loadDataPlanCodec; + byte[] loadDataPlanProperty = loadDataPlanCodec.encode(loadDataPlan); + assertEquals(true, Arrays.equals(loadDataPlanProperty, loadDataPlanBytesTest)); + } @Test public void logToOperator() throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException, AuthException { + /** Insert Plan test **/ byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan); InsertPlan insertPlanTest = (InsertPlan) PhysicalPlanLogTransfer .logToOperator(insertPlanBytesTest); assertEquals(true, insertPlanTest.equals(insertPlan)); + /** Delete Plan test **/ byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan); DeletePlan deletePlanTest = (DeletePlan) PhysicalPlanLogTransfer .logToOperator(deletePlanBytesTest); assertEquals(true, deletePlanTest.equals(deletePlan)); + /** Update Plan test **/ byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan); UpdatePlan updatePlanTest = (UpdatePlan) PhysicalPlanLogTransfer .logToOperator(updatePlanBytesTest); assertEquals(true, updatePlanTest.equals(updatePlan)); + /** Metadata Plan test **/ String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE"; MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement); byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan); @@ -105,11 +122,19 @@ public class PhysicalPlanLogTransferTest { .logToOperator(metadataPlanBytesTest); assertEquals(true, metadataPlanTest.equals(metadataPlan)); + /** Author Plan test **/ String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor"; AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql); byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan); - AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer.logToOperator(authorPlanBytesTest); + AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer + .logToOperator(authorPlanBytesTest); assertEquals(true, authorPlanTest.equals(authorPlan)); + /** LoadData Plan test **/ + byte[] loadDataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(loadDataPlan); + LoadDataPlan loadDataPlanTest = (LoadDataPlan) PhysicalPlanLogTransfer + .logToOperator(loadDataPlanBytesTest); + assertEquals(true, loadDataPlan.equals(loadDataPlanTest)); + } } \ No newline at end of file
