This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DisableAlign in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3ee98df2f6659a55dd407a1e28d1ebadddf304b5 Merge: 8f8d570 7c0f1d5 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 14 16:40:41 2020 +0800 Disable align NOTICE | 2 +- NOTICE-binary | 2 +- RELEASE_NOTES.md | 28 + client-py/compile.bat | 5 +- client-py/readme.md | 2 +- client-py/src/client_example.py | 2 + .../org/apache/iotdb/client/AbstractClient.java | 2 +- .../UserGuide/3-Server/4-Config Manual.md | 37 +- .../5-Operation Manual/4-SQL Reference.md | 13 +- .../UserGuide/3-Server/4-Config Manual.md | 39 +- .../4-Client/4-Programming - Other Languages.md | 53 +- .../5-Operation Manual/4-SQL Reference.md | 13 +- .../8-System Design (Developer)/1-Hierarchy.md | 6 +- .../main/java/org/apache/iotdb/JDBCExample.java | 2 +- server/pom.xml | 12 + .../resources/conf/iotdb-engine.properties | 15 +- server/src/assembly/resources/conf/iotdb-env.bat | 31 +- server/src/assembly/resources/conf/iotdb-env.sh | 31 +- server/src/assembly/resources/conf/logback.xml | 12 +- .../src/assembly/resources/sbin/start-server.bat | 6 +- server/src/assembly/resources/sbin/start-server.sh | 6 +- .../tsfileToolSet/print-tsfile-resource-files.sh | 0 .../tools/tsfileToolSet/print-tsfile-sketch.sh | 0 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 6 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 102 ++- .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 48 +- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 7 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 39 +- .../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 5 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 39 +- .../iotdb/db/engine/flush/MemTableFlushTask.java | 24 +- .../iotdb/db/engine/flush/TsFileFlushPolicy.java | 2 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 8 +- .../apache/iotdb/db/engine/memtable/IMemTable.java | 5 +- .../db/engine/memtable/IWritableMemChunk.java | 16 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 80 +- .../db/engine/merge/manage/MergeResource.java | 3 +- .../db/engine/modification/ModificationFile.java | 2 +- .../io/LocalTextModificationAccessor.java | 2 +- .../engine/modification/io/ModificationWriter.java | 1 + .../engine/storagegroup/StorageGroupProcessor.java | 813 ++++++++++++++------- .../db/engine/storagegroup/TsFileProcessor.java | 85 +-- .../db/engine/storagegroup/TsFileResource.java | 7 +- .../version/SimpleFileVersionController.java | 51 +- .../java/org/apache/iotdb/db/metadata/MGraph.java | 2 +- .../java/org/apache/iotdb/db/metadata/MTree.java | 2 +- .../apache/iotdb/db/qp/constant/DatetimeUtils.java | 2 +- .../iotdb/db/qp/executor/QueryProcessExecutor.java | 52 +- .../iotdb/db/qp/physical/crud/BatchInsertPlan.java | 56 +- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 34 +- .../iotdb/db/query/context/QueryContext.java | 19 +- .../db/query/dataset/NonAlignEngineDataSet.java | 22 +- .../iotdb/db/query/executor/EngineExecutor.java | 8 +- .../org/apache/iotdb/db/rescon/MemTablePool.java | 24 +- .../java/org/apache/iotdb/db/service/IoTDB.java | 6 +- .../org/apache/iotdb/db/service/JDBCService.java | 9 +- .../apache/iotdb/db/service/MetricsService.java | 62 +- .../apache/iotdb/db/service/RegisterManager.java | 3 + .../iotdb/db/sync/conf/SyncSenderConfig.java | 18 - .../iotdb/db/sync/receiver/SyncServerManager.java | 9 +- .../iotdb/db/sync/receiver/load/FileLoader.java | 2 +- .../db/sync/receiver/load/FileLoaderManager.java | 9 +- .../db/sync/sender/manage/ISyncFileManager.java | 10 +- .../db/sync/sender/manage/SyncFileManager.java | 137 ++-- .../iotdb/db/sync/sender/transfer/ISyncClient.java | 6 +- .../iotdb/db/sync/sender/transfer/SyncClient.java | 120 +-- .../FileUtils.java} | 46 +- .../java/org/apache/iotdb/db/utils/SyncUtils.java | 10 +- .../iotdb/db/utils/datastructure/BinaryTVList.java | 30 + .../db/utils/datastructure/BooleanTVList.java | 30 + .../iotdb/db/utils/datastructure/DoubleTVList.java | 30 + .../iotdb/db/utils/datastructure/FloatTVList.java | 30 + .../iotdb/db/utils/datastructure/IntTVList.java | 30 + .../iotdb/db/utils/datastructure/LongTVList.java | 30 + .../iotdb/db/utils/datastructure/TVList.java | 30 +- .../iotdb/db/writelog/recover/LogReplayer.java | 13 +- .../writelog/recover/TsFileRecoverPerformer.java | 2 +- .../db/conf/adapter/CompressionRatioTest.java | 6 - .../adapter/IoTDBConfigDynamicAdapterTest.java | 9 +- .../db/engine/cache/DeviceMetaDataCacheTest.java | 5 +- .../iotdb/db/engine/merge/MergeTaskTest.java | 9 +- .../apache/iotdb/db/engine/merge/MergeTest.java | 10 +- .../engine/modification/DeletionFileNodeTest.java | 56 +- .../engine/modification/ModificationFileTest.java | 8 +- .../io/LocalTextModificationAccessorTest.java | 4 +- .../storagegroup/StorageGroupProcessorTest.java | 14 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 61 +- .../engine/storagegroup/TsFileProcessorTest.java | 8 +- .../version/SimpleFileVersionControllerTest.java | 4 +- .../iotdb/db/integration/IOTDBGroupByIT.java | 29 +- .../integration/IOTDBGroupByInnerIntervalIT.java | 6 - .../iotdb/db/integration/IoTDBAggregationIT.java | 42 +- .../integration/IoTDBAggregationLargeDataIT.java | 8 +- .../integration/IoTDBAggregationSmallDataIT.java | 6 - .../iotdb/db/integration/IoTDBAuthorizationIT.java | 18 +- .../db/integration/IoTDBAutoCreateSchemaIT.java | 6 - .../apache/iotdb/db/integration/IoTDBCloseIT.java | 6 - .../iotdb/db/integration/IoTDBCompleteIT.java | 6 - .../apache/iotdb/db/integration/IoTDBDaemonIT.java | 6 +- .../db/integration/IoTDBDeleteStorageGroupIT.java | 6 - .../iotdb/db/integration/IoTDBDeletionIT.java | 16 +- .../db/integration/IoTDBEngineTimeGeneratorIT.java | 5 - .../apache/iotdb/db/integration/IoTDBFillIT.java | 6 - .../db/integration/IoTDBFloatPrecisionIT.java | 6 - .../db/integration/IoTDBFlushQueryMergeTest.java | 13 +- .../iotdb/db/integration/IoTDBGroupbyDeviceIT.java | 5 - .../iotdb/db/integration/IoTDBLargeDataIT.java | 5 - .../iotdb/db/integration/IoTDBLimitSlimitIT.java | 5 - .../integration/IoTDBLoadExternalTsfileTest.java | 56 +- .../iotdb/db/integration/IoTDBMergeTest.java | 6 - .../iotdb/db/integration/IoTDBMetadataFetchIT.java | 6 - .../iotdb/db/integration/IoTDBMultiSeriesIT.java | 50 +- .../db/integration/IoTDBMultiStatementsIT.java | 6 - .../iotdb/db/integration/IoTDBNumberPathIT.java | 6 - .../iotdb/db/integration/IoTDBQueryDemoIT.java | 4 - .../iotdb/db/integration/IoTDBQuotedPathIT.java | 5 - ...IoTDBAggregationIT.java => IoTDBRecoverIT.java} | 317 ++------ .../db/integration/IoTDBSequenceDataQueryIT.java | 5 - .../iotdb/db/integration/IoTDBSeriesReaderIT.java | 5 - .../iotdb/db/integration/IoTDBSimpleQueryTest.java | 5 - .../iotdb/db/integration/IoTDBTimeZoneIT.java | 5 - .../apache/iotdb/db/integration/IoTDBTtlIT.java | 6 - .../iotdb/db/integration/IoTDBVersionIT.java | 6 - .../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 59 +- .../fileRelated/UnSealedTsFileReaderTest.java | 13 +- .../NewUnseqResourceMergeReaderTest.java | 13 +- .../resourceRelated/SeqResourceReaderTest.java | 7 +- .../resourceRelated/UnseqResourceReaderTest.java | 5 +- .../db/sync/receiver/load/FileLoaderTest.java | 55 +- .../recover/SyncReceiverLogAnalyzerTest.java | 7 +- .../db/sync/sender/manage/SyncFileManagerTest.java | 186 +++-- .../sender/recover/SyncSenderLogAnalyzerTest.java | 64 +- .../apache/iotdb/db/tools/IoTDBWatermarkTest.java | 16 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 59 +- .../iotdb/db/writelog/IoTDBLogFileSizeTest.java | 7 - .../apache/iotdb/db/writelog/PerformanceTest.java | 2 + .../db/writelog/recover/SeqTsFileRecoverTest.java | 2 +- .../writelog/recover/UnseqTsFileRecoverTest.java | 3 +- server/src/test/resources/logback.xml | 2 +- service-rpc/src/pypi/setup.py | 2 +- session/pom.xml | 9 +- .../java/org/apache/iotdb/session/Session.java | 129 +++- .../org/apache/iotdb/session/IoTDBSessionIT.java | 335 ++++++++- .../iotdb/session/utils/EnvironmentUtils.java | 190 ----- .../iotdb/tsfile/file/metadata/ChunkMetaData.java | 3 +- .../tsfile/fileSystem/fsFactory/FSFactory.java | 67 ++ .../tsfile/fileSystem/fsFactory/HDFSFactory.java | 15 + .../fileSystem/fsFactory/LocalFSFactory.java | 9 + 148 files changed, 2871 insertions(+), 1827 deletions(-) diff --cc client/src/main/java/org/apache/iotdb/client/AbstractClient.java index 0531ba0,23c2440..a5a85eb --- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java +++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java @@@ -527,33 -485,16 +527,33 @@@ public abstract class AbstractClient } maxValueLength = tmp; } - for (int i = 2; i <= colCount; i++) { - if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) { - blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+"); - } else { - blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+"); + if (printTimestamp) { + for (int i = 2; i <= colCount; i++) { + if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) { + blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+"); + } else { + blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+"); + } + } + } else { + for (int i = 1; i <= colCount; i++) { + blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+"); } } - } else { - blockLine.append("+"); + } + // for disable align clause + else { + int tmp = Integer.MIN_VALUE; for (int i = 1; i <= colCount; i++) { + int len = resultSetMetaData.getColumnLabel(i).length(); + tmp = Math.max(tmp, len); + } + maxValueLength = tmp; + blockLine.append("+"); + for (int i = 2; i <= colCount / 2 + 1; i++) { + if (printTimestamp) { + blockLine.append(StringUtils.repeat('-', maxTimeLength)).append("+"); - } ++ } blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+"); } } diff --cc example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java index cffdd32,99a6193..8b4bc7b --- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java +++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java @@@ -13,11 -31,23 +13,11 @@@ public class JDBCExample Class.forName("org.apache.iotdb.jdbc.IoTDBDriver"); try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - statement.execute("SET STORAGE GROUP TO root.sg1"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE"); - - for (int i = 0; i <= 100; i++) { - statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")"); - } - statement.executeBatch(); - statement.clearBatch(); - - ResultSet resultSet = statement.executeQuery("select * from root where time <= 10"); - outputResult(resultSet); - resultSet = statement.executeQuery("select count(*) from root"); - outputResult(resultSet); - resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)"); + long startTime = System.currentTimeMillis(); - ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000"); ++ ResultSet resultSet = statement.executeQuery("select * from root where time < 100000000 disable align"); outputResult(resultSet); + long endTime = System.currentTimeMillis(); + System.out.println("Cost Time: " + (endTime - startTime)); } } diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java index 09c648f,479849a..e4e8e31 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java @@@ -18,30 -18,31 +18,27 @@@ */ package org.apache.iotdb.db.conf; --import java.io.File; --import java.io.FileInputStream; --import java.io.FileOutputStream; --import java.io.IOException; --import java.io.InputStreamReader; --import java.util.Properties; -- import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import java.io.*; ++import java.util.Properties; ++ public class IoTDBConfigCheck { // this file is located in data/system/schema/system_properties. // If user delete folder "data", system_properties can reset. public static final String PROPERTIES_FILE_NAME = "system.properties"; public static final String SCHEMA_DIR = -- IoTDBDescriptor.getInstance().getConfig().getSchemaDir(); ++ IoTDBDescriptor.getInstance().getConfig().getSchemaDir(); private static final IoTDBConfigCheck INSTANCE = new IoTDBConfigCheck(); private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class); - private Properties properties = new Properties(); // this is a initial parameter. private static String TIMESTAMP_PRECISION = "ms"; + private static long PARTITION_INTERVAL = 86400; + private Properties properties = new Properties(); public static final IoTDBConfigCheck getInstance() { return IoTDBConfigCheck.INSTANCE; @@@ -49,6 -50,24 +46,24 @@@ public void checkConfig() { TIMESTAMP_PRECISION = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision(); + + // check time stamp precision + if (!(TIMESTAMP_PRECISION.equals("ms") || TIMESTAMP_PRECISION.equals("us") - || TIMESTAMP_PRECISION.equals("ns"))) { ++ || TIMESTAMP_PRECISION.equals("ns"))) { + logger.error("Wrong timestamp precision, please set as: ms, us or ns ! Current is: " - + TIMESTAMP_PRECISION); ++ + TIMESTAMP_PRECISION); + System.exit(-1); + } + + PARTITION_INTERVAL = IoTDBDescriptor.getInstance().getConfig() - .getPartitionInterval(); ++ .getPartitionInterval(); + + // check partition interval + if (PARTITION_INTERVAL <= 0) { + logger.error("Partition interval must larger than 0!"); + System.exit(-1); + } + createDir(SCHEMA_DIR); checkFile(SCHEMA_DIR); logger.info("System configuration is ok."); @@@ -65,7 -84,8 +80,8 @@@ private void checkFile(String filepath) { // create file : read timestamp precision from engine.properties, create system_properties.txt // use output stream to write timestamp precision to file. - File file = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME); + File file = SystemFileFactory.INSTANCE - .getFile(filepath + File.separator + PROPERTIES_FILE_NAME); ++ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME); try { if (!file.exists()) { file.createNewFile(); @@@ -79,12 -100,19 +96,19 @@@ logger.error("Can not create {}.", file.getAbsolutePath(), e); } // get existed properties from system_properties.txt - File inputFile = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME); + File inputFile = SystemFileFactory.INSTANCE - .getFile(filepath + File.separator + PROPERTIES_FILE_NAME); ++ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME); try (FileInputStream inputStream = new FileInputStream(inputFile.toString())) { properties.load(new InputStreamReader(inputStream, TSFileConfig.STRING_CHARSET)); if (!properties.getProperty("timestamp_precision").equals(TIMESTAMP_PRECISION)) { logger.error("Wrong timestamp precision, please set as: " + properties -- .getProperty("timestamp_precision") + " !"); ++ .getProperty("timestamp_precision") + " !"); + System.exit(-1); + } + if (!(Long.parseLong(properties.getProperty("storage_group_time_range")) - == PARTITION_INTERVAL)) { ++ == PARTITION_INTERVAL)) { + logger.error("Wrong storage group time range, please set as: " + properties - .getProperty("storage_group_time_range") + " !"); ++ .getProperty("storage_group_time_range") + " !"); System.exit(-1); } } catch (IOException e) { diff --cc server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java index c2e72e5,0000000..b024450 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java @@@ -1,352 -1,0 +1,354 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset; + +import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; +import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; +import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; ++import java.util.concurrent.atomic.AtomicIntegerArray; + +public class NonAlignEngineDataSet extends QueryDataSet { + + private static class ReadTask implements Runnable { + + private final ManagedSeriesReader reader; + private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue; + private WatermarkEncoder encoder; + NonAlignEngineDataSet dataSet; + private int index; + + + public ReadTask(ManagedSeriesReader reader, + BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue, + WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) { + this.reader = reader; + this.blockingQueue = blockingQueue; + this.encoder = encoder; + this.dataSet = dataSet; + this.index = index; + } + + @Override + public void run() { + PublicBAOS timeBAOS = new PublicBAOS(); + PublicBAOS valueBAOS = new PublicBAOS(); + try { + synchronized (reader) { + // if the task is submitted, there must be free space in the queue + // so here we don't need to check whether the queue has free space + // the reader has next batch + if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent()) + || reader.hasNextBatch()) { + BatchData batchData; + if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent()) + batchData = dataSet.cachedBatchData[index]; + else + batchData = reader.nextBatch(); + + int rowCount = 0; + while (rowCount < dataSet.fetchSize) { + - if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) { ++ if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray.get(index) >= dataSet.limit)) { + break; + } + + if (batchData != null && batchData.hasCurrent()) { - if (dataSet.offsetArray[index] == 0) { ++ if (dataSet.offsetArray.get(index) == 0) { + long time = batchData.currentTime(); + ReadWriteIOUtils.write(time, timeBAOS); + TSDataType type = batchData.getDataType(); + switch (type) { + case INT32: + int intValue = batchData.getInt(); + if (encoder != null && encoder.needEncode(time)) { + intValue = encoder.encodeInt(intValue, time); + } + ReadWriteIOUtils.write(intValue, valueBAOS); + break; + case INT64: + long longValue = batchData.getLong(); + if (encoder != null && encoder.needEncode(time)) { + longValue = encoder.encodeLong(longValue, time); + } + ReadWriteIOUtils.write(longValue, valueBAOS); + break; + case FLOAT: + float floatValue = batchData.getFloat(); + if (encoder != null && encoder.needEncode(time)) { + floatValue = encoder.encodeFloat(floatValue, time); + } + ReadWriteIOUtils.write(floatValue, valueBAOS); + break; + case DOUBLE: + double doubleValue = batchData.getDouble(); + if (encoder != null && encoder.needEncode(time)) { + doubleValue = encoder.encodeDouble(doubleValue, time); + } + ReadWriteIOUtils.write(doubleValue, valueBAOS); + break; + case BOOLEAN: + ReadWriteIOUtils.write(batchData.getBoolean(), + valueBAOS); + break; + case TEXT: + ReadWriteIOUtils + .write(batchData.getBinary(), + valueBAOS); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + } + batchData.next(); + } + else { + if (reader.hasNextBatch()) { + batchData = reader.nextBatch(); + dataSet.cachedBatchData[index] = batchData; + continue; + } + else + break; + } - if (dataSet.offsetArray[index] == 0) { ++ if (dataSet.offsetArray.get(index) == 0) { + rowCount++; + if (dataSet.limit > 0) { - dataSet.alreadyReturnedRowNumArray[index]++; ++ dataSet.alreadyReturnedRowNumArray.incrementAndGet(index); + } + } else { - dataSet.offsetArray[index]--; ++ dataSet.offsetArray.decrementAndGet(index); + } + } + if (rowCount == 0) { + blockingQueue.put(new Pair(null, null)); + // set the hasRemaining field in reader to false + // tell the Consumer not to submit another task for this reader any more + reader.setHasRemaining(false); + // remove itself from the QueryTaskPoolManager + reader.setManagedByQueryManager(false); + return; + } + + ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size()); + timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size()); + timeBuffer.flip(); + ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size()); + valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size()); + valueBuffer.flip(); + + Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer); + + blockingQueue.put(timeValueBAOSPair); + // if the queue also has free space, just submit another itself + if (blockingQueue.remainingCapacity() > 0) { + pool.submit(this); + } + // the queue has no more space + // remove itself from the QueryTaskPoolManager + else { + reader.setManagedByQueryManager(false); + } + return; + } + blockingQueue.put(new Pair(null, null)); + // set the hasRemaining field in reader to false + // tell the Consumer not to submit another task for this reader any more + reader.setHasRemaining(false); + // remove itself from the QueryTaskPoolManager + reader.setManagedByQueryManager(false); + } + } catch (InterruptedException e) { + LOGGER.error("Interrupted while putting into the blocking queue: ", e); + } catch (IOException e) { + LOGGER.error("Something gets wrong while reading from the series reader: ", e); + } catch (Exception e) { + LOGGER.error("Something gets wrong: ", e); + } + + } + + } + + + private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList; + + // Blocking queue list for each time value buffer pair + private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray; + + private boolean initialized = false; + - private int[] offsetArray; ++ private AtomicIntegerArray offsetArray; + + private int limit; + - private int[] alreadyReturnedRowNumArray; ++ private AtomicIntegerArray alreadyReturnedRowNumArray; + + private BatchData[] cachedBatchData; + + // indicate that there is no more batch data in the corresponding queue + // in case that the consumer thread is blocked on the queue and won't get runnable any more + // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter + // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false + // noMoreDataInQueue can still be true + // its usage is to tell the consumer thread not to call the take() method. + private boolean[] noMoreDataInQueueArray; + + private int fetchSize; + + // indicate that there is no more batch data in the corresponding queue + // in case that the consumer thread is blocked on the queue and won't get runnable any more + // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter + // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false + // noMoreDataInQueue can still be true + // its usage is to tell the consumer thread not to call the take() method. + + // capacity for blocking queue + private static final int BLOCKING_QUEUE_CAPACITY = 5; + + private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance(); + + private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class); + + /** + * constructor of EngineDataSet. + * + * @param paths paths in List structure + * @param dataTypes time series data type + * @param readers readers in List(IPointReader) structure + */ + public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes, + List<ManagedSeriesReader> readers) { + super(paths, dataTypes); + this.seriesReaderWithoutValueFilterList = readers; + blockingQueueArray = new BlockingQueue[readers.size()]; + noMoreDataInQueueArray = new boolean[readers.size()]; + for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { + blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); + } + } + + private void initLimit(int offset, int limit, int size) { - offsetArray = new int[size]; - Arrays.fill(offsetArray, offset); ++ int[] offsetArrayTemp = new int[size]; ++ Arrays.fill(offsetArrayTemp, offset); ++ offsetArray = new AtomicIntegerArray(offsetArrayTemp); + this.limit = limit; - alreadyReturnedRowNumArray = new int[size]; ++ this.alreadyReturnedRowNumArray = new AtomicIntegerArray(size); + cachedBatchData = new BatchData[size]; + } + + private void init(WatermarkEncoder encoder, int fetchSize) { + initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size()); + this.fetchSize = fetchSize; + for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { + ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i); + reader.setHasRemaining(true); + reader.setManagedByQueryManager(true); + pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i)); + } + this.initialized = true; + } + + /** + * for RPC in RawData query between client and server + * fill time buffers and value buffers + */ + public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException { + if (!initialized) { + init(encoder, fetchSize); + } + int seriesNum = seriesReaderWithoutValueFilterList.size(); + TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet(); + + List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum); + List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum); + + for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) { + if (!noMoreDataInQueueArray[seriesIndex]) { + Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take(); + if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) { + noMoreDataInQueueArray[seriesIndex] = true; + timeValueByteBufferPair.left = ByteBuffer.allocate(0); + timeValueByteBufferPair.right = ByteBuffer.allocate(0); + } + timeBufferList.add(timeValueByteBufferPair.left); + valueBufferList.add(timeValueByteBufferPair.right); + } + else { + timeBufferList.add(ByteBuffer.allocate(0)); + valueBufferList.add(ByteBuffer.allocate(0)); + continue; + } + + synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) { + if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) { + ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex); + // if the reader isn't being managed and still has more data, + // that means this read task leave the pool before because the queue has no more space + // now we should submit it again + if (!reader.isManagedByQueryManager() && reader.hasRemaining()) { + reader.setManagedByQueryManager(true); + pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], + encoder, this, seriesIndex)); + } + } + } + } + + // set time buffers, value buffers and bitmap buffers + tsQueryNonAlignDataSet.setTimeList(timeBufferList); + tsQueryNonAlignDataSet.setValueList(valueBufferList); + + return tsQueryNonAlignDataSet; + } + + + @Override + protected boolean hasNextWithoutConstraint() { + return false; + } + + @Override + protected RowRecord nextWithoutConstraint() { + return null; + } + + +} diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java index bc70fa8,f6582ff..0a10375 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java @@@ -89,32 -88,6 +89,28 @@@ public class EngineExecutor throw new StorageEngineException(e.getMessage()); } } + + public QueryDataSet executeNonAlign(QueryContext context) + throws StorageEngineException, IOException { + + Filter timeFilter = null; + if (optimizedExpression != null) { + timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter(); + } + + List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>(); + for (int i = 0; i < deduplicatedPaths.size(); i++) { + Path path = deduplicatedPaths.get(i); + TSDataType dataType = deduplicatedDataTypes.get(i); + + ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, + true); + readersOfSelectedSeries.add(reader); + } + - try { - return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes, - readersOfSelectedSeries); - } catch (InterruptedException e) { - throw new StorageEngineException(e.getMessage()); - } ++ return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes, ++ readersOfSelectedSeries); + } /** * executeWithValueFilter query. diff --cc server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java index 4b03cb1,3750746..87157bf --- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java @@@ -18,8 -18,8 +18,6 @@@ */ package org.apache.iotdb.db.rescon; --import java.util.ArrayDeque; --import java.util.Deque; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.memtable.IMemTable; @@@ -27,6 -27,6 +25,9 @@@ import org.apache.iotdb.db.engine.memta import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import java.util.ArrayDeque; ++import java.util.Deque; ++ public class MemTablePool { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); diff --cc session/pom.xml index bc7b83b,8df9ae6..dab30aa --- a/session/pom.xml +++ b/session/pom.xml @@@ -19,125 -19,134 +19,132 @@@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>iotdb-parent</artifactId> - <groupId>org.apache.iotdb</groupId> - <version>0.10.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>iotdb-session</artifactId> - <name>IoTDB Session</name> - <properties> - <session.test.skip>false</session.test.skip> - <session.it.skip>${session.test.skip}</session.it.skip> - <session.ut.skip>${session.test.skip}</session.ut.skip> - </properties> - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>3.1.0</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <!-- this is used for inheritance merges --> - <phase>package</phase> - <!-- bind to the packaging phase --> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - <!--using `mvn test` to run UT, `mvn verify` to run ITs +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>iotdb-parent</artifactId> + <groupId>org.apache.iotdb</groupId> + <version>0.10.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>iotdb-session</artifactId> + <name>IoTDB Session</name> + <properties> + <session.test.skip>false</session.test.skip> + <session.it.skip>${session.test.skip}</session.it.skip> + <session.ut.skip>${session.test.skip}</session.ut.skip> + </properties> + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.1.0</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <!-- this is used for inheritance merges --> + <phase>package</phase> + <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <!--using `mvn test` to run UT, `mvn verify` to run ITs - Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/--> + Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skipTests>${session.ut.skip}</skipTests> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <executions> - <execution> - <id>run-integration-tests</id> - <phase>integration-test</phase> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - <configuration> - <skipTests>${session.test.skip}</skipTests> - <skipITs>${session.it.skip}</skipITs> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>service-rpc</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>tsfile</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-server</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-server</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-jdbc</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - </dependencies> - <profiles> - <profile> - <id>skipSessionTests</id> - <activation> - <property> - <name>skipTests</name> - <value>true</value> - </property> - </activation> - <properties> - <session.test.skip>true</session.test.skip> - <session.ut.skip>true</session.ut.skip> - <session.it.skip>true</session.it.skip> - </properties> - </profile> - <profile> - <id>skipUT_SessionTests</id> - <activation> - <property> - <name>skipUTs</name> - <value>true</value> - </property> - </activation> - <properties> - <session.ut.skip>true</session.ut.skip> - </properties> - </profile> - </profiles> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>${session.ut.skip}</skipTests> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <id>run-integration-tests</id> + <phase>integration-test</phase> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + <configuration> + <skipTests>${session.test.skip}</skipTests> + <skipITs>${session.it.skip}</skipITs> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>service-rpc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>tsfile</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-server</artifactId> + <version>${project.version}</version> ++ <type>test-jar</type> ++ <scope>test</scope> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.iotdb</groupId> ++ <artifactId>iotdb-server</artifactId> ++ <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-jdbc</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + </dependencies> + <profiles> + <profile> + <id>skipSessionTests</id> + <activation> + <property> + <name>skipTests</name> + <value>true</value> + </property> + </activation> + <properties> + <session.test.skip>true</session.test.skip> + <session.ut.skip>true</session.ut.skip> + <session.it.skip>true</session.it.skip> + </properties> + </profile> + <profile> + <id>skipUT_SessionTests</id> + <activation> + <property> + <name>skipUTs</name> + <value>true</value> + </property> + </activation> + <properties> + <session.ut.skip>true</session.ut.skip> + </properties> + </profile> + </profiles> </project>
