This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DisableAlign in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit be4b8af0d06f2cd58c162b3f147e9f9f0237309a Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 14 15:56:03 2020 +0800 s --- .../main/java/org/apache/iotdb/JDBCExample.java | 47 +---- .../db/query/dataset/NonAlignEngineDataSet.java | 226 ++++++++++++--------- 2 files changed, 137 insertions(+), 136 deletions(-) diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java index 99a6193..cffdd32 100644 --- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java +++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java @@ -1,21 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ package org.apache.iotdb; import java.sql.Connection; @@ -31,23 +13,11 @@ public class JDBCExample { Class.forName("org.apache.iotdb.jdbc.IoTDBDriver"); try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - statement.execute("SET STORAGE GROUP TO root.sg1"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE"); - statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE"); - - for (int i = 0; i <= 100; i++) { - statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")"); - } - statement.executeBatch(); - statement.clearBatch(); - - ResultSet resultSet = statement.executeQuery("select * from root where time <= 10"); - outputResult(resultSet); - resultSet = statement.executeQuery("select count(*) from root"); - outputResult(resultSet); - resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)"); + long startTime = System.currentTimeMillis(); + ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000"); outputResult(resultSet); + long endTime = System.currentTimeMillis(); + System.out.println("Cost Time: " + (endTime - startTime)); } } @@ -61,15 +31,6 @@ public class JDBCExample { } System.out.println(); while (resultSet.next()) { - for (int i = 1; ; i++) { - System.out.print(resultSet.getString(i)); - if (i < columnCount) { - System.out.print(", "); - } else { - System.out.println(); - break; - } - } } System.out.println("--------------------------\n"); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java index 1474bba..c2e72e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java @@ -19,13 +19,6 @@ package org.apache.iotdb.db.query.dataset; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; @@ -42,28 +35,33 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + public class NonAlignEngineDataSet extends QueryDataSet { - + private static class ReadTask implements Runnable { - + private final ManagedSeriesReader reader; - private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue; + private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue; private WatermarkEncoder encoder; - private int rowOffset; - private int fetchSize; - private int rowLimit; - private int alreadyReturnedRowNum = 0; - - public ReadTask(ManagedSeriesReader reader, - BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue, - WatermarkEncoder encoder, - int rowOffset, int rowLimit, int fetchSize) { + NonAlignEngineDataSet dataSet; + private int index; + + + public ReadTask(ManagedSeriesReader reader, + BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue, + WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) { this.reader = reader; this.blockingQueue = blockingQueue; this.encoder = encoder; - this.rowOffset = rowOffset; - this.rowLimit = rowLimit; - this.fetchSize = fetchSize; + this.dataSet = dataSet; + this.index = index; } @Override @@ -75,18 +73,23 @@ public class NonAlignEngineDataSet extends QueryDataSet { // if the task is submitted, there must be free space in the queue // so here we don't need to check whether the queue has free space // the reader has next batch - if (reader.hasNextBatch()) { - BatchData batchData = reader.nextBatch(); - + if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent()) + || reader.hasNextBatch()) { + BatchData batchData; + if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent()) + batchData = dataSet.cachedBatchData[index]; + else + batchData = reader.nextBatch(); + int rowCount = 0; - while (rowCount < fetchSize) { - - if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) { + while (rowCount < dataSet.fetchSize) { + + if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) { break; } - + if (batchData != null && batchData.hasCurrent()) { - if (rowOffset == 0) { + if (dataSet.offsetArray[index] == 0) { long time = batchData.currentTime(); ReadWriteIOUtils.write(time, timeBAOS); TSDataType type = batchData.getDataType(); @@ -136,20 +139,42 @@ public class NonAlignEngineDataSet extends QueryDataSet { batchData.next(); } else { - break; + if (reader.hasNextBatch()) { + batchData = reader.nextBatch(); + dataSet.cachedBatchData[index] = batchData; + continue; + } + else + break; } - if (rowOffset == 0) { + if (dataSet.offsetArray[index] == 0) { rowCount++; - if (rowLimit > 0) { - alreadyReturnedRowNum++; + if (dataSet.limit > 0) { + dataSet.alreadyReturnedRowNumArray[index]++; } } else { - rowOffset--; + dataSet.offsetArray[index]--; } } - - Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS); - + if (rowCount == 0) { + blockingQueue.put(new Pair(null, null)); + // set the hasRemaining field in reader to false + // tell the Consumer not to submit another task for this reader any more + reader.setHasRemaining(false); + // remove itself from the QueryTaskPoolManager + reader.setManagedByQueryManager(false); + return; + } + + ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size()); + timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size()); + timeBuffer.flip(); + ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size()); + valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size()); + valueBuffer.flip(); + + Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer); + blockingQueue.put(timeValueBAOSPair); // if the queue also has free space, just submit another itself if (blockingQueue.remainingCapacity() > 0) { @@ -162,7 +187,7 @@ public class NonAlignEngineDataSet extends QueryDataSet { } return; } - blockingQueue.put(new Pair(timeBAOS, valueBAOS)); + blockingQueue.put(new Pair(null, null)); // set the hasRemaining field in reader to false // tell the Consumer not to submit another task for this reader any more reader.setHasRemaining(false); @@ -176,19 +201,37 @@ public class NonAlignEngineDataSet extends QueryDataSet { } catch (Exception e) { LOGGER.error("Something gets wrong: ", e); } - + } - + } - + private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList; - + // Blocking queue list for each time value buffer pair - private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray; - + private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray; + private boolean initialized = false; - + + private int[] offsetArray; + + private int limit; + + private int[] alreadyReturnedRowNumArray; + + private BatchData[] cachedBatchData; + + // indicate that there is no more batch data in the corresponding queue + // in case that the consumer thread is blocked on the queue and won't get runnable any more + // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter + // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false + // noMoreDataInQueue can still be true + // its usage is to tell the consumer thread not to call the take() method. + private boolean[] noMoreDataInQueueArray; + + private int fetchSize; + // indicate that there is no more batch data in the corresponding queue // in case that the consumer thread is blocked on the queue and won't get runnable any more // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter @@ -202,7 +245,7 @@ public class NonAlignEngineDataSet extends QueryDataSet { private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class); - + /** * constructor of EngineDataSet. * @@ -211,102 +254,99 @@ public class NonAlignEngineDataSet extends QueryDataSet { * @param readers readers in List(IPointReader) structure */ public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes, - List<ManagedSeriesReader> readers) throws InterruptedException { + List<ManagedSeriesReader> readers) { super(paths, dataTypes); this.seriesReaderWithoutValueFilterList = readers; blockingQueueArray = new BlockingQueue[readers.size()]; + noMoreDataInQueueArray = new boolean[readers.size()]; for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); } } - - private void init(WatermarkEncoder encoder, int fetchSize) - throws InterruptedException { + + private void initLimit(int offset, int limit, int size) { + offsetArray = new int[size]; + Arrays.fill(offsetArray, offset); + this.limit = limit; + alreadyReturnedRowNumArray = new int[size]; + cachedBatchData = new BatchData[size]; + } + + private void init(WatermarkEncoder encoder, int fetchSize) { + initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size()); + this.fetchSize = fetchSize; for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i); reader.setHasRemaining(true); reader.setManagedByQueryManager(true); - pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize)); + pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i)); } this.initialized = true; } - + /** * for RPC in RawData query between client and server * fill time buffers and value buffers */ - public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException { + public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException { if (!initialized) { init(encoder, fetchSize); } int seriesNum = seriesReaderWithoutValueFilterList.size(); TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet(); - PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum]; - PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum]; - + List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum); + List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum); + for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) { + if (!noMoreDataInQueueArray[seriesIndex]) { + Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take(); + if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) { + noMoreDataInQueueArray[seriesIndex] = true; + timeValueByteBufferPair.left = ByteBuffer.allocate(0); + timeValueByteBufferPair.right = ByteBuffer.allocate(0); + } + timeBufferList.add(timeValueByteBufferPair.left); + valueBufferList.add(timeValueByteBufferPair.right); + } + else { + timeBufferList.add(ByteBuffer.allocate(0)); + valueBufferList.add(ByteBuffer.allocate(0)); + continue; + } + synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) { if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) { ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex); // if the reader isn't being managed and still has more data, // that means this read task leave the pool before because the queue has no more space // now we should submit it again - if (reader.isManagedByQueryManager() && reader.hasRemaining()) { + if (!reader.isManagedByQueryManager() && reader.hasRemaining()) { reader.setManagedByQueryManager(true); - pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], - encoder, rowOffset, rowLimit, fetchSize)); + pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], + encoder, this, seriesIndex)); } } - Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll(); - if (timevalueBAOSPair != null) { - timeBAOSList[seriesIndex] = timevalueBAOSPair.left; - valueBAOSList[seriesIndex] = timevalueBAOSPair.right; - } - else { - timeBAOSList[seriesIndex] = new PublicBAOS(); - valueBAOSList[seriesIndex] = new PublicBAOS(); - } } } - List<ByteBuffer> timeBufferList = new ArrayList<>(); - List<ByteBuffer> valueBufferList = new ArrayList<>(); - - for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) { - // add time buffer of current series - putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex); - - // add value buffer of current series - putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex); - } - // set time buffers, value buffers and bitmap buffers tsQueryNonAlignDataSet.setTimeList(timeBufferList); tsQueryNonAlignDataSet.setValueList(valueBufferList); return tsQueryNonAlignDataSet; } - - private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList, - int tsIndex) { - ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size()); - aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size()); - aBuffer.flip(); - aBufferList.add(aBuffer); - } + @Override - protected boolean hasNextWithoutConstraint() throws IOException { - // TODO Auto-generated method stub + protected boolean hasNextWithoutConstraint() { return false; } @Override - protected RowRecord nextWithoutConstraint() throws IOException { - // TODO Auto-generated method stub + protected RowRecord nextWithoutConstraint() { return null; } - + }
