This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 233bd92a53fa9bd9a69872e15fdb8d4dbe432df7
Merge: a5c6a1e 7aabea4
Author: 151250176 <[email protected]>
AuthorDate: Wed Dec 8 11:49:36 2021 +0800

    Merge branch 'master' of git://github.com/apache/iotdb into xkf_id_table
    
    # Conflicts:
    #   server/src/main/java/org/apache/iotdb/db/metadata/MManager.java

 .../protocol/influxdb}/input/InfluxLineProtocol.g4 |   2 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   2 +-
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |   2 +-
 docker/ReadMe.md                                   |   1 +
 docker/src/main/Dockerfile-single                  |   1 +
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  53 ++++-
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  58 ++++-
 influxdb-protocol/pom.xml                          |  85 ++++----
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  87 ++------
 .../iotdb/influxdb/IoTDBInfluxDBFactory.java       |   2 +-
 .../iotdb/influxdb/example/InfluxDBExample.java    |  36 +--
 .../protocol/constant/InfluxDBConstant.java        |   2 -
 .../iotdb/influxdb/protocol/dto/IoTDBPoint.java    |  65 ------
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |  52 ++++-
 .../protocol/impl/IoTDBInfluxDBService.java        | 131 +++--------
 .../influxdb/protocol/meta/MetaManagerHolder.java  |  54 -----
 .../influxdb/protocol/util/DataTypeUtils.java      |  86 --------
 .../iotdb/influxdb/session/InfluxDBSession.java    | 242 +++++++++++++++++++++
 .../influxdb/integration/IoTDBInfluxDBIT.java      |  94 +-------
 pom.xml                                            |   2 +
 server/pom.xml                                     |  10 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  35 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../org/apache/iotdb/db/metadata/MManager.java     |  68 +++---
 .../apache/iotdb/db/metadata/id_table/IDTable.java |  16 +-
 .../apache/iotdb/db/metrics/ui/MetricsPage.java    |   2 +-
 .../influxdb}/constant/InfluxDBConstant.java       |   5 +-
 .../iotdb/db/protocol/influxdb/dto/IoTDBPoint.java | 128 ++++++-----
 .../protocol/influxdb}/input/InfluxLineParser.java |  21 +-
 .../db/protocol/influxdb}/meta/MetaManager.java    | 127 +++++++----
 .../db/protocol/influxdb}/meta/TagInfoRecords.java |  30 ++-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  40 +++-
 .../iotdb/db/rest/impl/RestApiServiceImpl.java     |   2 +-
 .../{RPCService.java => InfluxDBRPCService.java}   |  46 ++--
 .../iotdb/db/service/InfluxDBRPCServiceMBean.java  |  10 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +
 .../org/apache/iotdb/db/service/RPCService.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/StaticResps.java   |   4 +-
 .../db/service/basic/BasicServiceProvider.java     |   2 +-
 .../handler/InfluxDBServiceThriftHandler.java      |  59 +++++
 .../handler}/RPCServiceThriftHandler.java          |   4 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   | 155 +++++++++++++
 .../service/{ => thrift/impl}/TSServiceImpl.java   |   6 +-
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   | 149 +++++++++++++
 .../org/apache/iotdb/db/utils/ParameterUtils.java  |  20 +-
 .../apache/iotdb/db/utils/RandomDeleteCache.java   |  76 -------
 .../iotdb/db/metadata/id_table/IDTableTest.java    |  18 +-
 .../influxdb}/input/InfluxLineParserTest.java      |   3 +-
 service-rpc/pom.xml                                |   5 +
 .../iotdb/rpc/InfluxDBSynchronizedHandler.java     |  56 +++++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  38 ++++
 .../iotdb/rpc/StatementExecutionException.java     |   7 +
 .../java/org/apache/iotdb/session/Session.java     |  88 +-------
 .../apache/iotdb/session/util/SessionUtils.java    |  90 ++++++++
 spark-iotdb-connector/pom.xml                      |  36 +--
 .../org/apache/iotdb/spark/db/Converter.scala      |   6 +-
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |  30 ++-
 .../org/apache/iotdb/spark/db/IoTDBOptions.scala   |   2 +-
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |   7 +-
 .../iotdb/spark/db/tools/DataFrameTools.java       | 162 ++++++++++++++
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      |  32 ++-
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 ++++++++++
 thrift-influxdb/README.md                          |  22 ++
 {service-rpc => thrift-influxdb}/pom.xml           |  90 ++------
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  92 ++++++++
 67 files changed, 1914 insertions(+), 1073 deletions(-)

diff --cc 
server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
index d69ba49,0000000..c5a895e
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
@@@ -1,377 -1,0 +1,379 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.metadata.id_table;
 +
- import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
- 
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 +import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 +import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry;
 +import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory;
 +import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
 +import org.apache.iotdb.db.metadata.id_table.entry.InsertMeasurementMNode;
 +import org.apache.iotdb.db.metadata.id_table.entry.SchemaEntry;
 +import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID;
 +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 +import org.apache.iotdb.db.metadata.path.PartialPath;
 +import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
++
 +/** id table belongs to a storage group and mapping timeseries path to it's 
schema */
 +public class IDTable {
 +
 +  // number of table slot
 +  private static final int NUM_OF_SLOTS = 256;
 +  /** logger */
 +  private static final Logger logger = LoggerFactory.getLogger(IDTable.class);
 +  /**
 +   * 256 hashmap for avoiding rehash performance issue and lock competition 
device ID ->
 +   * (measurement name -> schema entry)
 +   */
 +  private Map<IDeviceID, DeviceEntry>[] idTables;
 +  /** disk schema manager to manage disk schema entry */
 +  private DiskSchemaManager diskSchemaManager;
 +  /** iotdb config */
 +  protected static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 +
 +  public IDTable() {
 +    idTables = new Map[NUM_OF_SLOTS];
 +    for (int i = 0; i < NUM_OF_SLOTS; i++) {
 +      idTables[i] = new HashMap<>();
 +    }
 +  }
 +
 +  /**
 +   * create aligned timeseries
 +   *
 +   * @param plan create aligned timeseries plan
 +   * @throws MetadataException if the device is not aligned, throw it
 +   */
 +  public synchronized void 
createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
 +      throws MetadataException {
 +    DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true);
 +
 +    for (int i = 0; i < plan.getMeasurements().size(); i++) {
 +      SchemaEntry schemaEntry =
 +          new SchemaEntry(
 +              plan.getDataTypes().get(i), plan.getEncodings().get(i), 
plan.getCompressors().get(i));
 +      deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
 +    }
 +  }
 +
 +  /**
 +   * create timeseries
 +   *
 +   * @param plan create timeseries plan
 +   * @throws MetadataException if the device is aligned, throw it
 +   */
 +  public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws 
MetadataException {
 +    DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), 
false);
 +    SchemaEntry schemaEntry =
 +        new SchemaEntry(plan.getDataType(), plan.getEncoding(), 
plan.getCompressor());
 +    deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
 +  }
 +
 +  /**
 +   * check inserting timeseries existence and fill their measurement mnode
 +   *
 +   * @param plan insert plan
 +   * @return reusable device id
 +   * @throws MetadataException if insert plan's aligned value is inconsistent 
with device
 +   */
 +  public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws 
MetadataException {
 +    PartialPath devicePath = plan.getDeviceId();
 +    String[] measurementList = plan.getMeasurements();
 +    IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
 +
 +    // 1. get device entry and check align
 +    DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned());
 +
 +    // 2. get schema of each measurement
 +    for (int i = 0; i < measurementList.length; i++) {
 +      try {
 +        // get MeasurementMNode, auto create if absent
 +        try {
 +          IMeasurementMNode measurementMNode =
 +              getOrCreateMeasurementIfNotExist(deviceEntry, plan, i);
 +
 +          checkDataTypeMatch(plan, i, measurementMNode.getSchema().getType());
 +          measurementMNodes[i] = measurementMNode;
 +        } catch (DataTypeMismatchException mismatchException) {
 +          if (!config.isEnablePartialInsert()) {
 +            throw mismatchException;
 +          } else {
 +            // mark failed measurement
 +            plan.markFailedMeasurementInsertion(i, mismatchException);
 +          }
 +        }
 +      } catch (MetadataException e) {
 +        if (IoTDB.isClusterMode()) {
 +          logger.debug(
 +              "meet error when check {}.{}, message: {}",
 +              devicePath,
 +              measurementList[i],
 +              e.getMessage());
 +        } else {
 +          logger.warn(
 +              "meet error when check {}.{}, message: {}",
 +              devicePath,
 +              measurementList[i],
 +              e.getMessage());
 +        }
 +        if (config.isEnablePartialInsert()) {
 +          // mark failed measurement
 +          plan.markFailedMeasurementInsertion(i, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +    }
 +
 +    return deviceEntry.getDeviceID();
 +  }
 +
 +  /**
 +   * update latest flushed time of one timeseries
 +   *
 +   * @param timeseriesID timeseries id
 +   * @param flushTime latest flushed time
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, 
long flushTime)
 +      throws MetadataException {
 +    getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime);
 +  }
 +
 +  /**
 +   * update latest flushed time of one timeseries
 +   *
 +   * @param timeseriesID timeseries id
 +   * @return latest flushed time of one timeseries
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID)
 +      throws MetadataException {
 +    return getSchemaEntry(timeseriesID).getFlushTime();
 +  }
 +
 +  /**
 +   * register trigger to the timeseries
 +   *
 +   * @param fullPath full path of the timeseries
 +   * @param measurementMNode the timeseries measurement mnode
 +   * @throws MetadataException if the timeseries is not exits
 +   */
 +  public synchronized void registerTrigger(PartialPath fullPath, 
IMeasurementMNode measurementMNode)
 +      throws MetadataException {
 +    boolean isAligned = measurementMNode.getParent().isAligned();
 +    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), 
isAligned);
 +
 +    deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
 +  }
 +
 +  /**
 +   * deregister trigger to the timeseries
 +   *
 +   * @param fullPath full path of the timeseries
 +   * @param measurementMNode the timeseries measurement mnode
 +   * @throws MetadataException if the timeseries is not exits
 +   */
 +  public synchronized void deregisterTrigger(
 +      PartialPath fullPath, IMeasurementMNode measurementMNode) throws 
MetadataException {
 +    boolean isAligned = measurementMNode.getParent().isAligned();
 +    DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), 
isAligned);
 +
 +    deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
 +  }
 +
 +  /**
 +   * check whether a time series is exist if exist, check the type 
consistency if not exist, call
 +   * MManager to create it
 +   *
 +   * @return measurement MNode of the time series or null if type is not match
 +   */
 +  private IMeasurementMNode getOrCreateMeasurementIfNotExist(
 +      DeviceEntry deviceEntry, InsertPlan plan, int loc) throws 
MetadataException {
 +    String measurementName = plan.getMeasurements()[loc];
 +    PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), 
measurementName);
 +
 +    SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName);
 +
 +    // if not exist, we create it
 +    if (schemaEntry == null) {
 +      if (!config.isAutoCreateSchemaEnabled()) {
 +        throw new PathNotExistException(seriesKey.toString());
 +      }
 +
 +      // create new timeseries in mmanager
 +      try {
 +        if (plan.isAligned()) {
 +          // create aligned timeseries
 +          List<TSEncoding> encodings = new ArrayList<>();
 +          List<CompressionType> compressors = new ArrayList<>();
 +          for (TSDataType dataType : plan.getDataTypes()) {
 +            encodings.add(getDefaultEncoding(dataType));
 +            
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
 +          }
 +
 +          CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
 +              new CreateAlignedTimeSeriesPlan(
 +                  plan.getDeviceId(),
 +                  Arrays.asList(plan.getMeasurements()),
 +                  Arrays.asList(plan.getDataTypes()),
 +                  encodings,
 +                  compressors,
 +                  null);
 +
 +          
IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan);
 +        } else {
 +          // create normal timeseries
 +          CreateTimeSeriesPlan createTimeSeriesPlan =
 +              new CreateTimeSeriesPlan(
 +                  seriesKey,
 +                  plan.getDataTypes()[loc],
 +                  getDefaultEncoding(plan.getDataTypes()[loc]),
 +                  TSFileDescriptor.getInstance().getConfig().getCompressor(),
 +                  null,
 +                  null,
 +                  null,
 +                  null);
 +
 +          IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1);
 +        }
 +      } catch (MetadataException e) {
 +        logger.error("create timeseries failed, path is:" + seriesKey);
 +      }
 +
 +      schemaEntry = deviceEntry.getSchemaEntry(measurementName);
 +    }
 +
 +    // timeseries is using trigger, we should get trigger from mmanager
 +    if (schemaEntry.isUsingTrigger()) {
 +      IMeasurementMNode measurementMNode = 
IoTDB.metaManager.getMeasurementMNode(seriesKey);
 +      return new InsertMeasurementMNode(
 +          measurementName, schemaEntry, 
measurementMNode.getTriggerExecutor());
 +    }
 +
 +    return new InsertMeasurementMNode(measurementName, schemaEntry);
 +  }
 +
 +  /**
 +   * get device id from device path and check is aligned,
 +   *
 +   * @param deviceName device name of the time series
 +   * @param isAligned whether the insert plan is aligned
 +   * @return device entry of the timeseries
 +   */
 +  private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean 
isAligned)
 +      throws MetadataException {
 +    IDeviceID deviceID = 
DeviceIDFactory.getInstance().getDeviceID(deviceName);
 +    int slot = calculateSlot(deviceID);
 +
 +    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
 +    // new device
 +    if (deviceEntry == null) {
 +      deviceEntry = new DeviceEntry(deviceID);
 +      deviceEntry.setAligned(isAligned);
 +      idTables[slot].put(deviceID, deviceEntry);
 +
 +      return deviceEntry;
 +    }
 +
 +    // check aligned
 +    if (deviceEntry.isAligned() != isAligned) {
 +      throw new MetadataException(
 +          String.format(
 +              "Timeseries under path [%s]'s align value is [%b], which is not 
consistent with insert plan",
 +              deviceName, deviceEntry.isAligned()));
 +    }
 +
 +    // reuse device entry in map
 +    return deviceEntry;
 +  }
 +
 +  /**
 +   * calculate slot that this deviceID should in
 +   *
 +   * @param deviceID device id
 +   * @return slot number
 +   */
 +  private int calculateSlot(IDeviceID deviceID) {
 +    return Math.abs(deviceID.hashCode()) % NUM_OF_SLOTS;
 +  }
 +
 +  /**
 +   * get schema entry
 +   *
 +   * @param timeseriesID the timeseries ID
 +   * @return schema entry of the timeseries
 +   * @throws MetadataException throw if this timeseries is not exist
 +   */
 +  private SchemaEntry getSchemaEntry(TimeseriesID timeseriesID) throws 
MetadataException {
 +    IDeviceID deviceID = timeseriesID.getDeviceID();
 +    int slot = calculateSlot(deviceID);
 +
 +    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
 +    if (deviceEntry == null) {
 +      throw new MetadataException(
 +          "update non exist timeseries's latest flushed time, timeseries id 
is: " + timeseriesID);
 +    }
 +
 +    SchemaEntry schemaEntry = 
deviceEntry.getSchemaEntry(timeseriesID.getMeasurement());
 +    if (schemaEntry == null) {
 +      throw new MetadataException(
 +          "update non exist timeseries's latest flushed time, timeseries id 
is: " + timeseriesID);
 +    }
 +
 +    return schemaEntry;
 +  }
 +
 +  // from mmanger
 +  private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType 
dataType)
 +      throws MetadataException {
 +    TSDataType insertDataType = plan.getDataTypes()[loc];
 +    if (dataType != insertDataType) {
 +      String measurement = plan.getMeasurements()[loc];
 +      logger.warn(
 +          "DataType mismatch, Insert measurement {} type {}, metadata tree 
type {}",
 +          measurement,
 +          insertDataType,
 +          dataType);
 +      throw new DataTypeMismatchException(measurement, insertDataType, 
dataType);
 +    }
 +  }
 +}
diff --cc 
server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
index 60d0774,0000000..b41fc42
mode 100644,000000..100644
--- 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
@@@ -1,561 -1,0 +1,563 @@@
 +/// *
 +// * Licensed to the Apache Software Foundation (ASF) under one
 +// * or more contributor license agreements.  See the NOTICE file
 +// * distributed with this work for additional information
 +// * regarding copyright ownership.  The ASF licenses this file
 +// * to you under the Apache License, Version 2.0 (the
 +// * "License"); you may not use this file except in compliance
 +// * with the License.  You may obtain a copy of the License at
 +// *
 +// *     http://www.apache.org/licenses/LICENSE-2.0
 +// *
 +// * Unless required by applicable law or agreed to in writing,
 +// * software distributed under the License is distributed on an
 +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// * KIND, either express or implied.  See the License for the
 +// * specific language governing permissions and limitations
 +// * under the License.
 +// */
 +
 +package org.apache.iotdb.db.metadata.id_table;
 +
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertNull;
- import static org.junit.Assert.assertTrue;
- import static org.junit.Assert.fail;
- 
- import java.util.Arrays;
- import java.util.Collections;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.engine.StorageEngine;
 +import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 +import org.apache.iotdb.db.exception.StorageEngineException;
 +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 +import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.query.QueryProcessException;
 +import org.apache.iotdb.db.metadata.MManager;
 +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 +import org.apache.iotdb.db.metadata.path.PartialPath;
 +import org.apache.iotdb.db.qp.Planner;
 +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 +import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.utils.EnvironmentUtils;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
++import java.util.Arrays;
++import java.util.Collections;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
++import static org.junit.Assert.assertNull;
++import static org.junit.Assert.assertTrue;
++import static org.junit.Assert.fail;
++
 +public class IDTableTest {
 +
 +  private CompressionType compressionType;
 +
 +  @Before
 +  public void setUp() {
 +    compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
 +    EnvironmentUtils.envSetUp();
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    EnvironmentUtils.cleanEnv();
 +  }
 +
 +  @Test
 +  public void testCreateAlignedTimeseriesAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      CreateAlignedTimeSeriesPlan plan =
 +          new CreateAlignedTimeSeriesPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              Arrays.asList("s1", "s2", "s3"),
 +              Arrays.asList(
 +                  TSDataType.valueOf("FLOAT"),
 +                  TSDataType.valueOf("INT64"),
 +                  TSDataType.valueOf("INT32")),
 +              Arrays.asList(
 +                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), 
TSEncoding.valueOf("RLE")),
 +              Arrays.asList(compressionType, compressionType, 
compressionType),
 +              null);
 +
 +      manager.createAlignedTimeSeriesEntry(plan);
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes =
 +          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, 
TSDataType.INT32};
 +
 +      String[] columns = new String[3];
 +      columns[0] = 2.0 + "";
 +      columns[1] = 10000 + "";
 +      columns[2] = 100 + "";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // with type mismatch
 +      dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE, 
TSDataType.INT32};
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // we should throw type mismatch exception here
 +      try {
 +        
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
 +        idTable.getSeriesSchemas(insertRowPlan2);
 +        fail("should throw exception");
 +      } catch (DataTypeMismatchException e) {
 +        assertEquals(
 +            "DataType mismatch, Insert measurement s2 type DOUBLE, metadata 
tree type INT64",
 +            e.getMessage());
 +      } catch (Exception e2) {
 +        fail("throw wrong exception");
 +      }
 +
 +      IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateAlignedTimeseriesAndInsertNotAlignedData() {
 +    MManager manager = IoTDB.metaManager;
 +
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      CreateAlignedTimeSeriesPlan plan =
 +          new CreateAlignedTimeSeriesPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              Arrays.asList("s1", "s2", "s3"),
 +              Arrays.asList(
 +                  TSDataType.valueOf("FLOAT"),
 +                  TSDataType.valueOf("INT64"),
 +                  TSDataType.valueOf("INT32")),
 +              Arrays.asList(
 +                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), 
TSEncoding.valueOf("RLE")),
 +              Arrays.asList(compressionType, compressionType, 
compressionType),
 +              null);
 +
 +      manager.createAlignedTimeSeriesEntry(plan);
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes =
 +          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, 
TSDataType.INT32};
 +
 +      String[] columns = new String[3];
 +      columns[0] = 2.0 + "";
 +      columns[1] = 10000 + "";
 +      columns[2] = 100 + "";
 +
 +      // non aligned plan
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2", "s3"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.aligned_device]'s align 
value is [true], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e2) {
 +        fail("throw wrong exception");
 +      }
 +
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateTimeseriesAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.s0"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      long time = 1L;
 +      String[] columns = new String[1];
 +      columns[0] = 2 + "";
 +
 +      // correct insert plan
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1"),
 +              time,
 +              new String[] {"s0"},
 +              new TSDataType[] {TSDataType.INT32},
 +              columns);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +      
assertEquals(insertRowPlan.getMeasurementMNodes()[0].getSchema().getType(), 
TSDataType.INT32);
 +      assertEquals(0, insertRowPlan.getFailedMeasurementNumber());
 +
 +      // construct an insertRowPlan with mismatched data type
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1"),
 +              time,
 +              new String[] {"s0"},
 +              new TSDataType[] {TSDataType.FLOAT},
 +              columns);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // get series schema
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +      fail(e.getMessage());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateTimeseriesAndInsertWithAlignedData() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s1"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s2"),
 +          TSDataType.valueOf("INT64"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, 
TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +      fail("should throw exception");
 +    } catch (MetadataException e) {
 +      assertEquals(
 +          "Timeseries under path [root.laptop.d1.non_aligned_device]'s align 
value is [false], which is not consistent with insert plan",
 +          e.getMessage());
 +    } catch (Exception e) {
 +      fail("throw wrong exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertAndAutoCreate() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, 
TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.non_aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.non_aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +
 +      // insert type mismatch data
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              false);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      
assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), 
TSDataType.INT64);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +      // insert aligned data
 +      InsertRowPlan insertRowPlan3 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              true);
 +      insertRowPlan3.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan3);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.non_aligned_device]'s 
align value is [false], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e) {
 +        fail("throw wrong exception");
 +      }
 +    } catch (MetadataException | StorageEngineException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testAlignedInsertAndAutoCreate() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      // construct an insertRowPlan with mismatched data type
 +      long time = 1L;
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, 
TSDataType.INT64};
 +
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              true);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +      assertTrue(s2Node.getParent().isAligned());
 +
 +      // insert type mismatch data
 +      InsertRowPlan insertRowPlan2 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              true);
 +      insertRowPlan2.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      idTable.getSeriesSchemas(insertRowPlan2);
 +
 +      assertNull(insertRowPlan2.getMeasurementMNodes()[0]);
 +      
assertEquals(insertRowPlan.getMeasurementMNodes()[1].getSchema().getType(), 
TSDataType.INT64);
 +      assertEquals(1, insertRowPlan2.getFailedMeasurementNumber());
 +
 +      // insert non-aligned data
 +      InsertRowPlan insertRowPlan3 =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              new TSDataType[] {TSDataType.INT64, TSDataType.INT64},
 +              columns,
 +              false);
 +      insertRowPlan3.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      try {
 +        idTable.getSeriesSchemas(insertRowPlan3);
 +        fail("should throw exception");
 +      } catch (MetadataException e) {
 +        assertEquals(
 +            "Timeseries under path [root.laptop.d1.aligned_device]'s align 
value is [true], which is not consistent with insert plan",
 +            e.getMessage());
 +      } catch (Exception e) {
 +        fail("throw wrong exception");
 +      }
 +    } catch (MetadataException | StorageEngineException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +
 +  @Test
 +  public void testTriggerAndInsert() {
 +    MManager manager = IoTDB.metaManager;
 +    try {
 +      long time = 1L;
 +
 +      manager.setStorageGroup(new PartialPath("root.laptop"));
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s1"),
 +          TSDataType.valueOf("INT32"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +      manager.createTimeseries(
 +          new PartialPath("root.laptop.d1.non_aligned_device.s2"),
 +          TSDataType.valueOf("INT64"),
 +          TSEncoding.valueOf("RLE"),
 +          compressionType,
 +          Collections.emptyMap());
 +
 +      Planner processor = new Planner();
 +
 +      String sql =
 +          "CREATE TRIGGER trigger1 BEFORE INSERT ON 
root.laptop.d1.non_aligned_device.s1 AS 
'org.apache.iotdb.db.metadata.id_table.trigger_example.Counter'";
 +
 +      CreateTriggerPlan plan = (CreateTriggerPlan) 
processor.parseSQLToPhysicalPlan(sql);
 +
 +      TriggerRegistrationService.getInstance().register(plan);
 +
 +      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, 
TSDataType.INT64};
 +      String[] columns = new String[2];
 +      columns[0] = "1";
 +      columns[1] = "2";
 +
 +      InsertRowPlan insertRowPlan =
 +          new InsertRowPlan(
 +              new PartialPath("root.laptop.d1.non_aligned_device"),
 +              time,
 +              new String[] {"s1", "s2"},
 +              dataTypes,
 +              columns,
 +              false);
 +      insertRowPlan.setMeasurementMNodes(
 +          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
 +
 +      // call getSeriesSchemasAndReadLockDevice
 +      IDTable idTable =
 +          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
 +
 +      idTable.getSeriesSchemas(insertRowPlan);
 +
 +      // check mmanager
 +      IMeasurementMNode s1Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.non_aligned_device.s1"));
 +      assertEquals("s1", s1Node.getName());
 +      assertEquals(TSDataType.INT32, s1Node.getSchema().getType());
 +      assertNotNull(s1Node.getTriggerExecutor());
 +
 +      IMeasurementMNode s2Node =
 +          manager.getMeasurementMNode(new 
PartialPath("root.laptop.d1.non_aligned_device.s2"));
 +      assertEquals("s2", s2Node.getName());
 +      assertEquals(TSDataType.INT64, s2Node.getSchema().getType());
 +      assertNull(s2Node.getTriggerExecutor());
 +
 +    } catch (MetadataException | StorageEngineException | 
QueryProcessException e) {
 +      e.printStackTrace();
 +      fail("throw exception");
 +    }
 +  }
 +}

Reply via email to