This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_id_table in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 233bd92a53fa9bd9a69872e15fdb8d4dbe432df7 Merge: a5c6a1e 7aabea4 Author: 151250176 <[email protected]> AuthorDate: Wed Dec 8 11:49:36 2021 +0800 Merge branch 'master' of git://github.com/apache/iotdb into xkf_id_table # Conflicts: # server/src/main/java/org/apache/iotdb/db/metadata/MManager.java .../protocol/influxdb}/input/InfluxLineProtocol.g4 | 2 +- .../iotdb/cluster/server/ClusterRPCService.java | 2 +- .../iotdb/cluster/server/ClusterTSServiceImpl.java | 2 +- docker/ReadMe.md | 1 + docker/src/main/Dockerfile-single | 1 + .../UserGuide/Ecosystem Integration/Spark IoTDB.md | 53 ++++- .../UserGuide/Ecosystem Integration/Spark IoTDB.md | 58 ++++- influxdb-protocol/pom.xml | 85 ++++---- .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java | 87 ++------ .../iotdb/influxdb/IoTDBInfluxDBFactory.java | 2 +- .../iotdb/influxdb/example/InfluxDBExample.java | 36 +-- .../protocol/constant/InfluxDBConstant.java | 2 - .../iotdb/influxdb/protocol/dto/IoTDBPoint.java | 65 ------ .../iotdb/influxdb/protocol/dto/SessionPoint.java | 52 ++++- .../protocol/impl/IoTDBInfluxDBService.java | 131 +++-------- .../influxdb/protocol/meta/MetaManagerHolder.java | 54 ----- .../influxdb/protocol/util/DataTypeUtils.java | 86 -------- .../iotdb/influxdb/session/InfluxDBSession.java | 242 +++++++++++++++++++++ .../influxdb/integration/IoTDBInfluxDBIT.java | 94 +------- pom.xml | 2 + server/pom.xml | 10 + .../org/apache/iotdb/db/concurrent/ThreadName.java | 2 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 35 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 + .../org/apache/iotdb/db/metadata/MManager.java | 68 +++--- .../apache/iotdb/db/metadata/id_table/IDTable.java | 16 +- .../apache/iotdb/db/metrics/ui/MetricsPage.java | 2 +- .../influxdb}/constant/InfluxDBConstant.java | 5 +- .../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java | 128 ++++++----- .../protocol/influxdb}/input/InfluxLineParser.java | 21 +- .../db/protocol/influxdb}/meta/MetaManager.java | 127 +++++++---- .../db/protocol/influxdb}/meta/TagInfoRecords.java | 30 ++- .../dataset/RawQueryDataSetWithoutValueFilter.java | 40 +++- .../iotdb/db/rest/impl/RestApiServiceImpl.java | 2 +- .../{RPCService.java => InfluxDBRPCService.java} | 46 ++-- .../iotdb/db/service/InfluxDBRPCServiceMBean.java | 10 +- .../java/org/apache/iotdb/db/service/IoTDB.java | 4 + .../org/apache/iotdb/db/service/RPCService.java | 2 + .../org/apache/iotdb/db/service/ServiceType.java | 1 + .../org/apache/iotdb/db/service/StaticResps.java | 4 +- .../db/service/basic/BasicServiceProvider.java | 2 +- .../handler/InfluxDBServiceThriftHandler.java | 59 +++++ .../handler}/RPCServiceThriftHandler.java | 4 +- .../service/thrift/impl/InfluxDBServiceImpl.java | 155 +++++++++++++ .../service/{ => thrift/impl}/TSServiceImpl.java | 6 +- .../org/apache/iotdb/db/utils/DataTypeUtils.java | 149 +++++++++++++ .../org/apache/iotdb/db/utils/ParameterUtils.java | 20 +- .../apache/iotdb/db/utils/RandomDeleteCache.java | 76 ------- .../iotdb/db/metadata/id_table/IDTableTest.java | 18 +- .../influxdb}/input/InfluxLineParserTest.java | 3 +- service-rpc/pom.xml | 5 + .../iotdb/rpc/InfluxDBSynchronizedHandler.java | 56 +++++ .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 38 ++++ .../iotdb/rpc/StatementExecutionException.java | 7 + .../java/org/apache/iotdb/session/Session.java | 88 +------- .../apache/iotdb/session/util/SessionUtils.java | 90 ++++++++ spark-iotdb-connector/pom.xml | 36 +-- .../org/apache/iotdb/spark/db/Converter.scala | 6 +- .../org/apache/iotdb/spark/db/DefaultSource.scala | 30 ++- .../org/apache/iotdb/spark/db/IoTDBOptions.scala | 2 +- .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala | 7 +- .../iotdb/spark/db/tools/DataFrameTools.java | 162 ++++++++++++++ .../org/apache/iotdb/spark/db/IoTDBTest.scala | 32 ++- .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 ++++++++++ thrift-influxdb/README.md | 22 ++ {service-rpc => thrift-influxdb}/pom.xml | 90 ++------ thrift-influxdb/src/main/thrift/influxdb.thrift | 92 ++++++++ 67 files changed, 1914 insertions(+), 1073 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java index d69ba49,0000000..c5a895e mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java @@@ -1,377 -1,0 +1,379 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.id_table; + - import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; - - import java.util.ArrayList; - import java.util.Arrays; - import java.util.HashMap; - import java.util.List; - import java.util.Map; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry; +import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory; +import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; +import org.apache.iotdb.db.metadata.id_table.entry.InsertMeasurementMNode; +import org.apache.iotdb.db.metadata.id_table.entry.SchemaEntry; +import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; ++ +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.HashMap; ++import java.util.List; ++import java.util.Map; ++ ++import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; ++ +/** id table belongs to a storage group and mapping timeseries path to it's schema */ +public class IDTable { + + // number of table slot + private static final int NUM_OF_SLOTS = 256; + /** logger */ + private static final Logger logger = LoggerFactory.getLogger(IDTable.class); + /** + * 256 hashmap for avoiding rehash performance issue and lock competition device ID -> + * (measurement name -> schema entry) + */ + private Map<IDeviceID, DeviceEntry>[] idTables; + /** disk schema manager to manage disk schema entry */ + private DiskSchemaManager diskSchemaManager; + /** iotdb config */ + protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + public IDTable() { + idTables = new Map[NUM_OF_SLOTS]; + for (int i = 0; i < NUM_OF_SLOTS; i++) { + idTables[i] = new HashMap<>(); + } + } + + /** + * create aligned timeseries + * + * @param plan create aligned timeseries plan + * @throws MetadataException if the device is not aligned, throw it + */ + public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan) + throws MetadataException { + DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true); + + for (int i = 0; i < plan.getMeasurements().size(); i++) { + SchemaEntry schemaEntry = + new SchemaEntry( + plan.getDataTypes().get(i), plan.getEncodings().get(i), plan.getCompressors().get(i)); + deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry); + } + } + + /** + * create timeseries + * + * @param plan create timeseries plan + * @throws MetadataException if the device is aligned, throw it + */ + public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException { + DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false); + SchemaEntry schemaEntry = + new SchemaEntry(plan.getDataType(), plan.getEncoding(), plan.getCompressor()); + deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry); + } + + /** + * check inserting timeseries existence and fill their measurement mnode + * + * @param plan insert plan + * @return reusable device id + * @throws MetadataException if insert plan's aligned value is inconsistent with device + */ + public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException { + PartialPath devicePath = plan.getDeviceId(); + String[] measurementList = plan.getMeasurements(); + IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); + + // 1. get device entry and check align + DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned()); + + // 2. get schema of each measurement + for (int i = 0; i < measurementList.length; i++) { + try { + // get MeasurementMNode, auto create if absent + try { + IMeasurementMNode measurementMNode = + getOrCreateMeasurementIfNotExist(deviceEntry, plan, i); + + checkDataTypeMatch(plan, i, measurementMNode.getSchema().getType()); + measurementMNodes[i] = measurementMNode; + } catch (DataTypeMismatchException mismatchException) { + if (!config.isEnablePartialInsert()) { + throw mismatchException; + } else { + // mark failed measurement + plan.markFailedMeasurementInsertion(i, mismatchException); + } + } + } catch (MetadataException e) { + if (IoTDB.isClusterMode()) { + logger.debug( + "meet error when check {}.{}, message: {}", + devicePath, + measurementList[i], + e.getMessage()); + } else { + logger.warn( + "meet error when check {}.{}, message: {}", + devicePath, + measurementList[i], + e.getMessage()); + } + if (config.isEnablePartialInsert()) { + // mark failed measurement + plan.markFailedMeasurementInsertion(i, e); + } else { + throw e; + } + } + } + + return deviceEntry.getDeviceID(); + } + + /** + * update latest flushed time of one timeseries + * + * @param timeseriesID timeseries id + * @param flushTime latest flushed time + * @throws MetadataException throw if this timeseries is not exist + */ + public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime) + throws MetadataException { + getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime); + } + + /** + * update latest flushed time of one timeseries + * + * @param timeseriesID timeseries id + * @return latest flushed time of one timeseries + * @throws MetadataException throw if this timeseries is not exist + */ + public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID) + throws MetadataException { + return getSchemaEntry(timeseriesID).getFlushTime(); + } + + /** + * register trigger to the timeseries + * + * @param fullPath full path of the timeseries + * @param measurementMNode the timeseries measurement mnode + * @throws MetadataException if the timeseries is not exits + */ + public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode) + throws MetadataException { + boolean isAligned = measurementMNode.getParent().isAligned(); + DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + + deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger(); + } + + /** + * deregister trigger to the timeseries + * + * @param fullPath full path of the timeseries + * @param measurementMNode the timeseries measurement mnode + * @throws MetadataException if the timeseries is not exits + */ + public synchronized void deregisterTrigger( + PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException { + boolean isAligned = measurementMNode.getParent().isAligned(); + DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + + deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger(); + } + + /** + * check whether a time series is exist if exist, check the type consistency if not exist, call + * MManager to create it + * + * @return measurement MNode of the time series or null if type is not match + */ + private IMeasurementMNode getOrCreateMeasurementIfNotExist( + DeviceEntry deviceEntry, InsertPlan plan, int loc) throws MetadataException { + String measurementName = plan.getMeasurements()[loc]; + PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), measurementName); + + SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName); + + // if not exist, we create it + if (schemaEntry == null) { + if (!config.isAutoCreateSchemaEnabled()) { + throw new PathNotExistException(seriesKey.toString()); + } + + // create new timeseries in mmanager + try { + if (plan.isAligned()) { + // create aligned timeseries + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressors = new ArrayList<>(); + for (TSDataType dataType : plan.getDataTypes()) { + encodings.add(getDefaultEncoding(dataType)); + compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); + } + + CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = + new CreateAlignedTimeSeriesPlan( + plan.getDeviceId(), + Arrays.asList(plan.getMeasurements()), + Arrays.asList(plan.getDataTypes()), + encodings, + compressors, + null); + + IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan); + } else { + // create normal timeseries + CreateTimeSeriesPlan createTimeSeriesPlan = + new CreateTimeSeriesPlan( + seriesKey, + plan.getDataTypes()[loc], + getDefaultEncoding(plan.getDataTypes()[loc]), + TSFileDescriptor.getInstance().getConfig().getCompressor(), + null, + null, + null, + null); + + IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1); + } + } catch (MetadataException e) { + logger.error("create timeseries failed, path is:" + seriesKey); + } + + schemaEntry = deviceEntry.getSchemaEntry(measurementName); + } + + // timeseries is using trigger, we should get trigger from mmanager + if (schemaEntry.isUsingTrigger()) { + IMeasurementMNode measurementMNode = IoTDB.metaManager.getMeasurementMNode(seriesKey); + return new InsertMeasurementMNode( + measurementName, schemaEntry, measurementMNode.getTriggerExecutor()); + } + + return new InsertMeasurementMNode(measurementName, schemaEntry); + } + + /** + * get device id from device path and check is aligned, + * + * @param deviceName device name of the time series + * @param isAligned whether the insert plan is aligned + * @return device entry of the timeseries + */ + private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned) + throws MetadataException { + IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName); + int slot = calculateSlot(deviceID); + + DeviceEntry deviceEntry = idTables[slot].get(deviceID); + // new device + if (deviceEntry == null) { + deviceEntry = new DeviceEntry(deviceID); + deviceEntry.setAligned(isAligned); + idTables[slot].put(deviceID, deviceEntry); + + return deviceEntry; + } + + // check aligned + if (deviceEntry.isAligned() != isAligned) { + throw new MetadataException( + String.format( + "Timeseries under path [%s]'s align value is [%b], which is not consistent with insert plan", + deviceName, deviceEntry.isAligned())); + } + + // reuse device entry in map + return deviceEntry; + } + + /** + * calculate slot that this deviceID should in + * + * @param deviceID device id + * @return slot number + */ + private int calculateSlot(IDeviceID deviceID) { + return Math.abs(deviceID.hashCode()) % NUM_OF_SLOTS; + } + + /** + * get schema entry + * + * @param timeseriesID the timeseries ID + * @return schema entry of the timeseries + * @throws MetadataException throw if this timeseries is not exist + */ + private SchemaEntry getSchemaEntry(TimeseriesID timeseriesID) throws MetadataException { + IDeviceID deviceID = timeseriesID.getDeviceID(); + int slot = calculateSlot(deviceID); + + DeviceEntry deviceEntry = idTables[slot].get(deviceID); + if (deviceEntry == null) { + throw new MetadataException( + "update non exist timeseries's latest flushed time, timeseries id is: " + timeseriesID); + } + + SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(timeseriesID.getMeasurement()); + if (schemaEntry == null) { + throw new MetadataException( + "update non exist timeseries's latest flushed time, timeseries id is: " + timeseriesID); + } + + return schemaEntry; + } + + // from mmanger + private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType) + throws MetadataException { + TSDataType insertDataType = plan.getDataTypes()[loc]; + if (dataType != insertDataType) { + String measurement = plan.getMeasurements()[loc]; + logger.warn( + "DataType mismatch, Insert measurement {} type {}, metadata tree type {}", + measurement, + insertDataType, + dataType); + throw new DataTypeMismatchException(measurement, insertDataType, dataType); + } + } +} diff --cc server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java index 60d0774,0000000..b41fc42 mode 100644,000000..100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java @@@ -1,561 -1,0 +1,563 @@@ +/// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ + +package org.apache.iotdb.db.metadata.id_table; + - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.assertNotNull; - import static org.junit.Assert.assertNull; - import static org.junit.Assert.assertTrue; - import static org.junit.Assert.fail; - - import java.util.Arrays; - import java.util.Collections; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; ++ +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + ++import java.util.Arrays; ++import java.util.Collections; ++ ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertNotNull; ++import static org.junit.Assert.assertNull; ++import static org.junit.Assert.assertTrue; ++import static org.junit.Assert.fail; ++ +public class IDTableTest { + + private CompressionType compressionType; + + @Before + public void setUp() { + compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testCreateAlignedTimeseriesAndInsert() { + MManager manager = IoTDB.metaManager; + + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + CreateAlignedTimeSeriesPlan plan = + new CreateAlignedTimeSeriesPlan( + new PartialPath("root.laptop.d1.aligned_device"), + Arrays.asList("s1", "s2", "s3"), + Arrays.asList( + TSDataType.valueOf("FLOAT"), + TSDataType.valueOf("INT64"), + TSDataType.valueOf("INT32")), + Arrays.asList( + TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), + Arrays.asList(compressionType, compressionType, compressionType), + null); + + manager.createAlignedTimeSeriesEntry(plan); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = + new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; + + String[] columns = new String[3]; + columns[0] = 2.0 + ""; + columns[1] = 10000 + ""; + columns[2] = 100 + ""; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan); + + // with type mismatch + dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32}; + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + true); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // we should throw type mismatch exception here + try { + IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false); + idTable.getSeriesSchemas(insertRowPlan2); + fail("should throw exception"); + } catch (DataTypeMismatchException e) { + assertEquals( + "DataType mismatch, Insert measurement s2 type DOUBLE, metadata tree type INT64", + e.getMessage()); + } catch (Exception e2) { + fail("throw wrong exception"); + } + + IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateAlignedTimeseriesAndInsertNotAlignedData() { + MManager manager = IoTDB.metaManager; + + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + CreateAlignedTimeSeriesPlan plan = + new CreateAlignedTimeSeriesPlan( + new PartialPath("root.laptop.d1.aligned_device"), + Arrays.asList("s1", "s2", "s3"), + Arrays.asList( + TSDataType.valueOf("FLOAT"), + TSDataType.valueOf("INT64"), + TSDataType.valueOf("INT32")), + Arrays.asList( + TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), + Arrays.asList(compressionType, compressionType, compressionType), + null); + + manager.createAlignedTimeSeriesEntry(plan); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = + new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; + + String[] columns = new String[3]; + columns[0] = 2.0 + ""; + columns[1] = 10000 + ""; + columns[2] = 100 + ""; + + // non aligned plan + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + false); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + try { + idTable.getSeriesSchemas(insertRowPlan); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e2) { + fail("throw wrong exception"); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateTimeseriesAndInsert() { + MManager manager = IoTDB.metaManager; + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.s0"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + long time = 1L; + String[] columns = new String[1]; + columns[0] = 2 + ""; + + // correct insert plan + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1"), + time, + new String[] {"s0"}, + new TSDataType[] {TSDataType.INT32}, + columns); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan); + assertEquals(insertRowPlan.getMeasurementMNodes()[0].getSchema().getType(), TSDataType.INT32); + assertEquals(0, insertRowPlan.getFailedMeasurementNumber()); + + // construct an insertRowPlan with mismatched data type + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1"), + time, + new String[] {"s0"}, + new TSDataType[] {TSDataType.FLOAT}, + columns); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // get series schema + idTable.getSeriesSchemas(insertRowPlan2); + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateTimeseriesAndInsertWithAlignedData() { + MManager manager = IoTDB.metaManager; + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s1"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s2"), + TSDataType.valueOf("INT64"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } + + @Test + public void testInsertAndAutoCreate() { + MManager manager = IoTDB.metaManager; + try { + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + false); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + + // insert type mismatch data + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + false); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan2); + + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + // insert aligned data + InsertRowPlan insertRowPlan3 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + true); + insertRowPlan3.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + try { + idTable.getSeriesSchemas(insertRowPlan3); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.non_aligned_device]'s align value is [false], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } catch (MetadataException | StorageEngineException e) { + e.printStackTrace(); + fail("throw exception"); + } + } + + @Test + public void testAlignedInsertAndAutoCreate() { + MManager manager = IoTDB.metaManager; + try { + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + assertTrue(s2Node.getParent().isAligned()); + + // insert type mismatch data + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + true); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan2); + + assertNull(insertRowPlan2.getMeasurementMNodes()[0]); + assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), TSDataType.INT64); + assertEquals(1, insertRowPlan2.getFailedMeasurementNumber()); + + // insert non-aligned data + InsertRowPlan insertRowPlan3 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT64, TSDataType.INT64}, + columns, + false); + insertRowPlan3.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + try { + idTable.getSeriesSchemas(insertRowPlan3); + fail("should throw exception"); + } catch (MetadataException e) { + assertEquals( + "Timeseries under path [root.laptop.d1.aligned_device]'s align value is [true], which is not consistent with insert plan", + e.getMessage()); + } catch (Exception e) { + fail("throw wrong exception"); + } + } catch (MetadataException | StorageEngineException e) { + e.printStackTrace(); + fail("throw exception"); + } + } + + @Test + public void testTriggerAndInsert() { + MManager manager = IoTDB.metaManager; + try { + long time = 1L; + + manager.setStorageGroup(new PartialPath("root.laptop")); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s1"), + TSDataType.valueOf("INT32"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + manager.createTimeseries( + new PartialPath("root.laptop.d1.non_aligned_device.s2"), + TSDataType.valueOf("INT64"), + TSEncoding.valueOf("RLE"), + compressionType, + Collections.emptyMap()); + + Planner processor = new Planner(); + + String sql = + "CREATE TRIGGER trigger1 BEFORE INSERT ON root.laptop.d1.non_aligned_device.s1 AS 'org.apache.iotdb.db.metadata.id_table.trigger_example.Counter'"; + + CreateTriggerPlan plan = (CreateTriggerPlan) processor.parseSQLToPhysicalPlan(sql); + + TriggerRegistrationService.getInstance().register(plan); + + TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + String[] columns = new String[2]; + columns[0] = "1"; + columns[1] = "2"; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.non_aligned_device"), + time, + new String[] {"s1", "s2"}, + dataTypes, + columns, + false); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + idTable.getSeriesSchemas(insertRowPlan); + + // check mmanager + IMeasurementMNode s1Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s1")); + assertEquals("s1", s1Node.getName()); + assertEquals(TSDataType.INT32, s1Node.getSchema().getType()); + assertNotNull(s1Node.getTriggerExecutor()); + + IMeasurementMNode s2Node = + manager.getMeasurementMNode(new PartialPath("root.laptop.d1.non_aligned_device.s2")); + assertEquals("s2", s2Node.getName()); + assertEquals(TSDataType.INT64, s2Node.getSchema().getType()); + assertNull(s2Node.getTriggerExecutor()); + + } catch (MetadataException | StorageEngineException | QueryProcessException e) { + e.printStackTrace(); + fail("throw exception"); + } + } +}
