This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_id_table in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5c6a1e7fe8492be365007234fbab242547b8784 Author: 151250176 <[email protected]> AuthorDate: Wed Dec 8 11:43:06 2021 +0800 add test --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 22 +- .../engine/storagegroup/StorageGroupProcessor.java | 68 +-- .../service/TriggerRegistrationService.java | 24 +- .../org/apache/iotdb/db/metadata/MManager.java | 47 +- .../iotdb/db/metadata/id_table/IDManagerImpl.java | 3 +- .../apache/iotdb/db/metadata/id_table/IDTable.java | 209 +++++---- .../metadata/id_table/entry/DeviceIDFactory.java | 3 +- .../id_table/entry/InsertMeasurementMNode.java | 9 +- .../db/metadata/id_table/entry/SchemaEntry.java | 17 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 7 +- .../iotdb/db/metadata/id_table/IDTableTest.java | 519 +++++++++++++++------ .../db/metadata/id_table/entry/DeviceIDTest.java | 7 +- .../metadata/id_table/entry/SchemaEntryTest.java | 7 +- .../metadata/id_table/trigger_example/Counter.java | 96 ++++ 15 files changed, 735 insertions(+), 315 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a007cc1..a28c8c4 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -18,11 +18,6 @@ */ package org.apache.iotdb.db.conf; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - -import java.io.File; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.compaction.CompactionPriority; import org.apache.iotdb.db.engine.compaction.cross.CrossCompactionStrategy; @@ -39,9 +34,16 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSType; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + public class IoTDBConfig { /* Names of Watermark methods */ diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index bbf74ac..56bf3e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -18,16 +18,6 @@ */ package org.apache.iotdb.db.conf; -import com.google.common.net.InetAddresses; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.Properties; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.compaction.CompactionPriority; @@ -41,9 +31,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSType; import org.apache.iotdb.tsfile.utils.FilePathUtils; + +import com.google.common.net.InetAddresses; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.Properties; + public class IoTDBDescriptor { private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index fbcd087..591b0f1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,39 +18,6 @@ */ package org.apache.iotdb.db.engine.storagegroup; -import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; -import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; -import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -117,9 +84,44 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; + +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; +import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one * TsFileProcessor in the working status. <br> diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java index 7d2cf9c..f5f8f5f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java @@ -20,13 +20,16 @@ package org.apache.iotdb.db.engine.trigger.service; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.trigger.api.Trigger; import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TriggerExecutionException; import org.apache.iotdb.db.exception.TriggerManagementException; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.id_table.IDTable; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; @@ -153,6 +156,15 @@ public class TriggerRegistrationService implements IService { executors.put(plan.getTriggerName(), executor); measurementMNode.setTriggerExecutor(executor); + + // update id table + try { + IDTable idTable = + StorageEngine.getInstance().getProcessor(plan.getFullPath().getDevicePath()).getIdTable(); + idTable.registerTrigger(plan.getFullPath(), measurementMNode); + } catch (StorageEngineException | MetadataException e) { + throw new TriggerManagementException(e.getMessage(), e); + } } public synchronized void deregister(DropTriggerPlan plan) throws TriggerManagementException { @@ -184,7 +196,7 @@ public class TriggerRegistrationService implements IService { } } - private void doDeregister(DropTriggerPlan plan) { + private void doDeregister(DropTriggerPlan plan) throws TriggerManagementException { TriggerExecutor executor = executors.remove(plan.getTriggerName()); executor.getMeasurementMNode().setTriggerExecutor(null); @@ -196,6 +208,16 @@ public class TriggerRegistrationService implements IService { TriggerClassLoaderManager.getInstance() .deregister(executor.getRegistrationInformation().getClassName()); + + // update id table + try { + PartialPath fullPath = executor.getMeasurementMNode().getPartialPath(); + IDTable idTable = + StorageEngine.getInstance().getProcessor(fullPath.getDevicePath()).getIdTable(); + idTable.deregisterTrigger(fullPath, executor.getMeasurementMNode()); + } catch (StorageEngineException | MetadataException e) { + throw new TriggerManagementException(e.getMessage(), e); + } } public void activate(StartTriggerPlan plan) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 9a8adbd..253c6e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -18,26 +18,6 @@ */ package org.apache.iotdb.db.metadata; -import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -116,9 +96,31 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + /** * This class takes the responsibility of serialization of all the metadata info and persistent it * into files. This class contains all the interfaces to modify the metadata for delta system. All @@ -453,7 +455,7 @@ public class MManager { // including create and delete public void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException { - createTimeseries(plan, -1); + createTimeseriesEntry(plan, -1); } public void createTimeseriesEntry(CreateTimeSeriesPlan plan, long offset) @@ -567,7 +569,7 @@ public class MManager { List<TSEncoding> encodings, List<CompressionType> compressors) throws MetadataException { - createAlignedTimeSeries( + createAlignedTimeSeriesEntry( new CreateAlignedTimeSeriesPlan( prefixPath, measurements, dataTypes, encodings, compressors, null)); } @@ -1286,6 +1288,7 @@ public class MManager { /** Get storage group node by path. the give path don't need to be storage group path. */ public IStorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException { + ensureStorageGroup(path); return mtree.getStorageGroupNodeByPath(path); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDManagerImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDManagerImpl.java index 46babf5..27003ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDManagerImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDManagerImpl.java @@ -19,11 +19,12 @@ package org.apache.iotdb.db.metadata.id_table; -import java.util.Map; import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.tsfile.utils.Pair; +import java.util.Map; + public class IDManagerImpl implements IDManager { /** storage group name -> ID table */ diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java index ff9f84d..d69ba49 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java @@ -40,12 +40,9 @@ import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; -import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.service.IoTDB; -import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -77,6 +74,12 @@ public class IDTable { } } + /** + * create aligned timeseries + * + * @param plan create aligned timeseries plan + * @throws MetadataException if the device is not aligned, throw it + */ public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan) throws MetadataException { DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true); @@ -89,6 +92,12 @@ public class IDTable { } } + /** + * create timeseries + * + * @param plan create timeseries plan + * @throws MetadataException if the device is aligned, throw it + */ public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException { DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false); SchemaEntry schemaEntry = @@ -97,68 +106,12 @@ public class IDTable { } /** - * check whether a time series is exist if exist, check the type consistency if not exist, call - * MManager to create it + * check inserting timeseries existence and fill their measurement mnode * - * @return measurement MNode of the time series or null if type is not match + * @param plan insert plan + * @return reusable device id + * @throws MetadataException if insert plan's aligned value is inconsistent with device */ - public synchronized IMeasurementMNode getOrCreateMeasurementIfNotExist( - DeviceEntry deviceEntry, InsertPlan plan, int loc) throws MetadataException { - String measurementName = plan.getMeasurements()[loc]; - PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), measurementName); - - SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName); - - // if not exist, we create it - if (schemaEntry == null) { - if (!config.isAutoCreateSchemaEnabled()) { - throw new PathNotExistException(seriesKey.toString()); - } - - // create new timeseries in mmanager - try { - if (plan.isAligned()) { - List<TSEncoding> encodings = new ArrayList<>(); - List<CompressionType> compressors = new ArrayList<>(); - for (TSDataType dataType : plan.getDataTypes()) { - encodings.add(getDefaultEncoding(dataType)); - compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); - } - - CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = - new CreateAlignedTimeSeriesPlan( - plan.getDeviceId(), - Arrays.asList(plan.getMeasurements()), - Arrays.asList(plan.getDataTypes()), - encodings, - compressors, - null); - - IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan); - } else { - CreateTimeSeriesPlan createTimeSeriesPlan = - new CreateTimeSeriesPlan( - seriesKey, - plan.getDataTypes()[loc], - getDefaultEncoding(plan.getDataTypes()[loc]), - TSFileDescriptor.getInstance().getConfig().getCompressor(), - null, - null, - null, - null); - - IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1); - } - } catch (MetadataException e) { - logger.error("create timeseries failed, path is:" + seriesKey); - } - - schemaEntry = deviceEntry.getSchemaEntry(measurementName); - } - - return new InsertMeasurementMNode(measurementName, schemaEntry); - } - public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException { PartialPath devicePath = plan.getDeviceId(); String[] measurementList = plan.getMeasurements(); @@ -236,6 +189,108 @@ public class IDTable { } /** + * register trigger to the timeseries + * + * @param fullPath full path of the timeseries + * @param measurementMNode the timeseries measurement mnode + * @throws MetadataException if the timeseries is not exits + */ + public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode) + throws MetadataException { + boolean isAligned = measurementMNode.getParent().isAligned(); + DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + + deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger(); + } + + /** + * deregister trigger to the timeseries + * + * @param fullPath full path of the timeseries + * @param measurementMNode the timeseries measurement mnode + * @throws MetadataException if the timeseries is not exits + */ + public synchronized void deregisterTrigger( + PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException { + boolean isAligned = measurementMNode.getParent().isAligned(); + DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + + deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger(); + } + + /** + * check whether a time series is exist if exist, check the type consistency if not exist, call + * MManager to create it + * + * @return measurement MNode of the time series or null if type is not match + */ + private IMeasurementMNode getOrCreateMeasurementIfNotExist( + DeviceEntry deviceEntry, InsertPlan plan, int loc) throws MetadataException { + String measurementName = plan.getMeasurements()[loc]; + PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), measurementName); + + SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName); + + // if not exist, we create it + if (schemaEntry == null) { + if (!config.isAutoCreateSchemaEnabled()) { + throw new PathNotExistException(seriesKey.toString()); + } + + // create new timeseries in mmanager + try { + if (plan.isAligned()) { + // create aligned timeseries + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressors = new ArrayList<>(); + for (TSDataType dataType : plan.getDataTypes()) { + encodings.add(getDefaultEncoding(dataType)); + compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); + } + + CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = + new CreateAlignedTimeSeriesPlan( + plan.getDeviceId(), + Arrays.asList(plan.getMeasurements()), + Arrays.asList(plan.getDataTypes()), + encodings, + compressors, + null); + + IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan); + } else { + // create normal timeseries + CreateTimeSeriesPlan createTimeSeriesPlan = + new CreateTimeSeriesPlan( + seriesKey, + plan.getDataTypes()[loc], + getDefaultEncoding(plan.getDataTypes()[loc]), + TSFileDescriptor.getInstance().getConfig().getCompressor(), + null, + null, + null, + null); + + IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1); + } + } catch (MetadataException e) { + logger.error("create timeseries failed, path is:" + seriesKey); + } + + schemaEntry = deviceEntry.getSchemaEntry(measurementName); + } + + // timeseries is using trigger, we should get trigger from mmanager + if (schemaEntry.isUsingTrigger()) { + IMeasurementMNode measurementMNode = IoTDB.metaManager.getMeasurementMNode(seriesKey); + return new InsertMeasurementMNode( + measurementName, schemaEntry, measurementMNode.getTriggerExecutor()); + } + + return new InsertMeasurementMNode(measurementName, schemaEntry); + } + + /** * get device id from device path and check is aligned, * * @param deviceName device name of the time series @@ -308,17 +363,6 @@ public class IDTable { // from mmanger private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType) throws MetadataException { - // TSDataType insertDataType; - // if (plan instanceof InsertRowPlan) { - // if (!((InsertRowPlan) plan).isNeedInferType()) { - // // only when InsertRowPlan's values is object[], we should check type - // insertDataType = getTypeInLoc(plan, loc); - // } else { - // insertDataType = dataType; - // } - // } else { - // insertDataType = getTypeInLoc(plan, loc); - // } TSDataType insertDataType = plan.getDataTypes()[loc]; if (dataType != insertDataType) { String measurement = plan.getMeasurements()[loc]; @@ -330,21 +374,4 @@ public class IDTable { throw new DataTypeMismatchException(measurement, insertDataType, dataType); } } - - /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */ - private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException { - TSDataType dataType; - if (plan instanceof InsertRowPlan) { - InsertRowPlan tPlan = (InsertRowPlan) plan; - dataType = - TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType()); - } else if (plan instanceof InsertTabletPlan) { - dataType = (plan).getDataTypes()[loc]; - } else { - throw new MetadataException( - String.format( - "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType())); - } - return dataType; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java index 4967536..7628ed6 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDFactory.java @@ -19,10 +19,11 @@ package org.apache.iotdb.db.metadata.id_table.entry; -import java.util.function.Function; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.metadata.path.PartialPath; +import java.util.function.Function; + /** factory to build device id according to configured algorithm */ public class DeviceIDFactory { Function<PartialPath, IDeviceID> getDeviceIDFunction; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java index eb952d5..1969a45 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/InsertMeasurementMNode.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.metadata.id_table.entry; -import java.util.Map; import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor; import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; import org.apache.iotdb.db.metadata.logfile.MLogWriter; @@ -34,6 +33,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; +import java.util.Map; + public class InsertMeasurementMNode implements IMeasurementMNode { SchemaEntry schemaEntry; @@ -42,6 +43,11 @@ public class InsertMeasurementMNode implements IMeasurementMNode { IMeasurementSchema schema; public InsertMeasurementMNode(String measurementId, SchemaEntry schemaEntry) { + this(measurementId, schemaEntry, null); + } + + public InsertMeasurementMNode( + String measurementId, SchemaEntry schemaEntry, TriggerExecutor executor) { this.schemaEntry = schemaEntry; schema = new UnaryMeasurementSchema( @@ -49,6 +55,7 @@ public class InsertMeasurementMNode implements IMeasurementMNode { schemaEntry.getTSDataType(), schemaEntry.getTSEncoding(), schemaEntry.getCompressionType()); + triggerExecutor = executor; } // region support methods diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java index 7daaf8c..017daf3 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.metadata.id_table.entry; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; - import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -29,6 +27,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + public class SchemaEntry implements ILastCacheContainer { /* 39 bits of disk pointer */ @@ -96,6 +96,19 @@ public class SchemaEntry implements ILastCacheContainer { flushTime = Math.max(flushTime, lastFlushTime); } + public boolean isUsingTrigger() { + return ((schema >> 24) & 1) == 1; + } + + public void setUsingTrigger() { + schema |= (1 << 24); + } + + public void setUnUsingTrigger() { + int mask = ~(1 << 24); + schema &= mask; + } + public long getLastTime() { return lastTime; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index a31b459..9276b71 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -19,9 +19,6 @@ package org.apache.iotdb.db.qp.physical.crud; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; @@ -30,6 +27,10 @@ import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + public abstract class InsertPlan extends PhysicalPlan { protected PartialPath deviceId; diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java index fbdc776..60d0774 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java @@ -19,17 +19,28 @@ package org.apache.iotdb.db.metadata.id_table; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Arrays; +import java.util.Collections; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.Planner; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -119,11 +130,16 @@ public class IDTableTest { try { IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false); idTable.getSeriesSchemas(insertRowPlan2); - fail(); - } catch (Exception e) { - + fail("should throw exception"); + } catch (DataTypeMismatchException e) { + assertEquals( + "DataType mismatch, Insert measurement s2 type DOUBLE, metadata tree type INT64", + e.getMessage()); + } catch (Exception e2) { + fail("throw wrong exception"); } + IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -164,6 +180,7 @@ public class IDTableTest { columns[1] = 10000 + ""; columns[2] = 100 + ""; + // non aligned plan InsertRowPlan insertRowPlan = new InsertRowPlan( new PartialPath("root.laptop.d1.aligned_device"), @@ -171,13 +188,21 @@ public class IDTableTest { new String[] {"s1", "s2", "s3"}, dataTypes, columns, - true); + false); insertRowPlan.setMeasurementMNodes( new IMeasurementMNode[insertRowPlan.getMeasurements().length]); // call getSeriesSchemasAndReadLockDevice - IDeviceID deviceID = idTable.getSeriesSchemas(insertRowPlan); - // assertEquals(3, manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**"))); + try { + idTable.getSeriesSchemas(insertRowPlan); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e2) { + fail("throw wrong exception"); + } } catch (Exception e) { e.printStackTrace(); @@ -185,136 +210,352 @@ public class IDTableTest { } } - // @Test - // public void testCreateAlignedTimeseriesAndInsertWithNotAlignedData() { - // MManager manager = IoTDB.metaManager; - // try { - // manager.setStorageGroup(new PartialPath("root.laptop")); - // manager.createAlignedTimeSeries( - // new PartialPath("root.laptop.d1.aligned_device"), - // Arrays.asList("s1", "s2", "s3"), - // Arrays.asList( - // TSDataType.valueOf("FLOAT"), - // TSDataType.valueOf("INT64"), - // TSDataType.valueOf("INT32")), - // Arrays.asList( - // TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), - // Arrays.asList(compressionType, compressionType, compressionType)); - // - // // construct an insertRowPlan with mismatched data type - // long time = 1L; - // TSDataType[] dataTypes = - // new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; - // - // String[] columns = new String[3]; - // columns[0] = "1.0"; - // columns[1] = "2"; - // columns[2] = "3"; - // - // InsertRowPlan insertRowPlan = - // new InsertRowPlan( - // new PartialPath("root.laptop.d1.aligned_device"), - // time, - // new String[] {"s1", "s2", "s3"}, - // dataTypes, - // columns, - // false); - // insertRowPlan.setMeasurementMNodes( - // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); - // - // // call getSeriesSchemasAndReadLockDevice - // manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); - // } catch (Exception e) { - // e.printStackTrace(); - // Assert.assertEquals( - // "Timeseries under path [root.laptop.d1.aligned_device] is aligned , please - // setInsertPlan.isAligned() = true", - // e.getMessage()); - // } - // } - // - // @Test - // public void testCreateTimeseriesAndInsertWithMismatchDataType() { - // MManager manager = IoTDB.metaManager; - // try { - // manager.setStorageGroup(new PartialPath("root.laptop")); - // manager.createTimeseries( - // new PartialPath("root.laptop.d1.s0"), - // TSDataType.valueOf("INT32"), - // TSEncoding.valueOf("RLE"), - // compressionType, - // Collections.emptyMap()); - // - // // construct an insertRowPlan with mismatched data type - // long time = 1L; - // TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT}; - // - // String[] columns = new String[1]; - // columns[0] = 2.0 + ""; - // - // InsertRowPlan insertRowPlan = - // new InsertRowPlan( - // new PartialPath("root.laptop.d1"), time, new String[] {"s0"}, dataTypes, columns); - // insertRowPlan.setMeasurementMNodes( - // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); - // - // // call getSeriesSchemasAndReadLockDevice - // IMNode node = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); - // assertEquals(1, manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**"))); - // assertNull(insertRowPlan.getMeasurementMNodes()[0]); - // assertEquals(1, insertRowPlan.getFailedMeasurementNumber()); - // - // } catch (Exception e) { - // e.printStackTrace(); - // fail(e.getMessage()); - // } - // } - // - // @Test - // public void testCreateTimeseriesAndInsertWithAlignedData() { - // MManager manager = IoTDB.metaManager; - // try { - // manager.setStorageGroup(new PartialPath("root.laptop")); - // manager.createTimeseries( - // new PartialPath("root.laptop.d1.aligned_device.s1"), - // TSDataType.valueOf("INT32"), - // TSEncoding.valueOf("RLE"), - // compressionType, - // Collections.emptyMap()); - // manager.createTimeseries( - // new PartialPath("root.laptop.d1.aligned_device.s2"), - // TSDataType.valueOf("INT64"), - // TSEncoding.valueOf("RLE"), - // compressionType, - // Collections.emptyMap()); - // - // // construct an insertRowPlan with mismatched data type - // long time = 1L; - // TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; - // - // String[] columns = new String[2]; - // columns[0] = "1"; - // columns[1] = "2"; - // - // InsertRowPlan insertRowPlan = - // new InsertRowPlan( - // new PartialPath("root.laptop.d1.aligned_device"), - // time, - // new String[] {"s1", "s2"}, - // dataTypes, - // columns, - // true); - // insertRowPlan.setMeasurementMNodes( - // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); - // - // // call getSeriesSchemasAndReadLockDevice - // manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); - // } catch (Exception e) { - // e.printStackTrace(); - // Assert.assertEquals( - // "Timeseries under path [root.laptop.d1.aligned_device] is not aligned , please set - // InsertPlan.isAligned() = false", - // e.getMessage()); - // } - // } + @Test + public void testCreateTimeseriesAndInsert() { + MManager manager = IoTDB.metaManager; + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.s0"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + long time = 1L; + String[] columns = new String[1]; + columns[0] = 2 + ""; + + // correct insert plan + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1"), + time, + new String[] {"s0"}, + new TSDataType[] {TSDataType.INT32}, + columns); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan); + assertEquals(insertRowPlan.getMeasurementMNodes()[0].getSchema().getType(), TSDataType.INT32); + assertEquals(0, insertRowPlan.getFailedMeasurementNumber()); + + // construct an insertRowPlan with mismatched data type + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1"), + time, + new String[] {"s0"}, + new TSDataType[] {TSDataType.FLOAT}, + columns); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // get series schema + idTable.getSeriesSchemas(insertRowPlan2); + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateTimeseriesAndInsertWithAlignedData() { + MManager manager = IoTDB.metaManager; + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s1"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s2"), + TSDataType.valueOf("INT64"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } + + @Test + public void testInsertAndAutoCreate() { + MManager manager = IoTDB.metaManager; + try { + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + false); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + + // insert type mismatch data + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + false); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan2); + + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + // insert aligned data + InsertRowPlan insertRowPlan3 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + true); + insertRowPlan3.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + try { + idTable.getSeriesSchemas(insertRowPlan3); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } catch (MetadataException | StorageEngineException e) { + e.printStackTrace(); + fail("throw exception"); + } + } + + @Test + public void testAlignedInsertAndAutoCreate() { + MManager manager = IoTDB.metaManager; + try { + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + assertTrue(s2Node.getParent().isAligned()); + + // insert type mismatch data + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + true); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan2); + + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + // insert non-aligned data + InsertRowPlan insertRowPlan3 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + false); + insertRowPlan3.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + try { + idTable.getSeriesSchemas(insertRowPlan3); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } catch (MetadataException | StorageEngineException e) { + e.printStackTrace(); + fail("throw exception"); + } + } + + @Test + public void testTriggerAndInsert() { + MManager manager = IoTDB.metaManager; + try { + long time = 1L; + + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s1"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s2"), + TSDataType.valueOf("INT64"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + Planner processor = new Planner(); + + String sql = + "CREATE TRIGGER trigger1 BEFORE INSERT ON root.laptop.d1.non_aligned_device.s1 AS 'org.apache.iotdb.db.metadata.id_table.trigger_example.Counter'"; + + CreateTriggerPlan plan = (CreateTriggerPlan) processor.parseSQLToPhysicalPlan(sql); + + TriggerRegistrationService.getInstance().register(plan); + + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + false); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + assertNotNull(s1Node.getTriggerExecutor()); + + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + assertNull(s2Node.getTriggerExecutor()); + + } catch (MetadataException | StorageEngineException | QueryProcessException e) { + e.printStackTrace(); + fail("throw exception"); + } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java index ba4096c..42c1cab 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/DeviceIDTest.java @@ -19,13 +19,14 @@ package org.apache.iotdb.db.metadata.id_table.entry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.path.PartialPath; + import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + public class DeviceIDTest { @Test public void deviceIDBuildTest() throws IllegalPathException { diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java index e5c83f8..a91eb2d 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java @@ -19,16 +19,17 @@ package org.apache.iotdb.db.metadata.id_table.entry; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; -import static org.junit.Assert.assertEquals; - import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + import org.junit.Test; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; +import static org.junit.Assert.assertEquals; + public class SchemaEntryTest { @Test public void schemaEntryBuildTest() throws IllegalPathException { diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/trigger_example/Counter.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/trigger_example/Counter.java new file mode 100644 index 0000000..7c1d5df --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/trigger_example/Counter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.id_table.trigger_example; + +import org.apache.iotdb.db.engine.trigger.api.Trigger; +import org.apache.iotdb.db.engine.trigger.api.TriggerAttributes; +import org.apache.iotdb.tsfile.utils.Binary; + +public class Counter implements Trigger { + + public static final int BASE = 1377; + + private int counter = 0; + private boolean isStopped = true; + + @Override + public void onCreate(TriggerAttributes attributes) { + counter = BASE; + isStopped = false; + } + + @Override + public void onStart() { + isStopped = false; + } + + @Override + public void onStop() { + isStopped = true; + } + + @Override + public Integer fire(long timestamp, Integer value) { + ++counter; + return value; + } + + @Override + public Long fire(long timestamp, Long value) { + ++counter; + return value; + } + + @Override + public Float fire(long timestamp, Float value) { + ++counter; + return value; + } + + @Override + public Double fire(long timestamp, Double value) { + ++counter; + return value; + } + + @Override + public Boolean fire(long timestamp, Boolean value) { + ++counter; + return value; + } + + @Override + public Binary fire(long timestamp, Binary value) { + ++counter; + return value; + } + + public void setCounter(int counter) { + this.counter = counter; + } + + public int getCounter() { + return counter; + } + + public boolean isStopped() { + return isStopped; + } +}
