This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b0033afcaf5 Optimized the overall performance of IoTDB & Fixed the NPE
in LimitOperatorTest (#17664) (#17819)
b0033afcaf5 is described below
commit b0033afcaf55585166c99bd8a31d90991fa1f5ae
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 11:30:47 2026 +0800
Optimized the overall performance of IoTDB & Fixed the NPE in
LimitOperatorTest (#17664) (#17819)
* Opt
* Update UnclosedFileScanHandleImpl.java
* Update StorageEngine.java
* Update ClosedFileScanHandleImpl.java
* column index
* spt
* Address performance review comments
* fix
(cherry picked from commit 89730b14397d675f2255dc2a8e8069168d18c7c3)
---
.../client-cpp/src/main/SessionDataSet.cpp | 26 +--
.../org/apache/iotdb/isession/SessionDataSet.java | 39 ++---
.../org/apache/iotdb/rpc/IoTDBJDBCDataSet.java | 47 ++++--
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 8 +
.../java/org/apache/iotdb/session/Session.java | 130 +++++++++------
.../analyze/cache/schema/DeviceSchemaCache.java | 2 +-
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 17 +-
.../iotdb/db/storageengine/StorageEngine.java | 16 +-
.../filescan/impl/ClosedFileScanHandleImpl.java | 45 +++---
.../filescan/impl/UnclosedFileScanHandleImpl.java | 83 +++++++---
.../dataregion/tsfile/TsFileManager.java | 21 +--
.../apache/iotdb/db/utils/ModificationUtils.java | 19 ++-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 179 +++++----------------
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 37 +++++
.../execution/operator/LimitOperatorTest.java | 3 +
15 files changed, 363 insertions(+), 309 deletions(-)
diff --git a/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
b/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
index 30e11fa1efb..fe28714419a 100644
--- a/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
+++ b/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
@@ -211,39 +211,41 @@ const std::vector<std::string>&
SessionDataSet::DataIterator::getColumnTypeList(
shared_ptr<RowRecord> SessionDataSet::constructRowRecordFromValueArray() {
std::vector<Field> outFields;
- for (int i = iotdbRpcDataSet_->getValueColumnStartIndex(); i <
iotdbRpcDataSet_->getColumnSize(); i++) {
+ const int32_t valueColumnStartIndex =
iotdbRpcDataSet_->getValueColumnStartIndex();
+ const int32_t columnSize = iotdbRpcDataSet_->getColumnSize();
+ outFields.reserve(columnSize - valueColumnStartIndex);
+ for (int32_t columnIndex = valueColumnStartIndex + 1; columnIndex <=
columnSize; ++columnIndex) {
Field field;
- std::string columnName = iotdbRpcDataSet_->getColumnNameList().at(i);
- if (!iotdbRpcDataSet_->isNullByColumnName(columnName)) {
- TSDataType::TSDataType dataType =
iotdbRpcDataSet_->getDataType(columnName);
+ if (!iotdbRpcDataSet_->isNullByIndex(columnIndex)) {
+ TSDataType::TSDataType dataType =
iotdbRpcDataSet_->getDataTypeByIndex(columnIndex);
field.dataType = dataType;
switch (dataType) {
case TSDataType::BOOLEAN:
- field.boolV = iotdbRpcDataSet_->getBoolean(columnName);
+ field.boolV = iotdbRpcDataSet_->getBooleanByIndex(columnIndex);
break;
case TSDataType::INT32:
- field.intV = iotdbRpcDataSet_->getInt(columnName);
+ field.intV = iotdbRpcDataSet_->getIntByIndex(columnIndex);
break;
case TSDataType::DATE:
- field.dateV =
parseIntToDate(iotdbRpcDataSet_->getInt(columnName));
+ field.dateV = iotdbRpcDataSet_->getDateByIndex(columnIndex);
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
- field.longV = iotdbRpcDataSet_->getLong(columnName);
+ field.longV = iotdbRpcDataSet_->getLongByIndex(columnIndex);
break;
case TSDataType::FLOAT:
- field.floatV = iotdbRpcDataSet_->getFloat(columnName);
+ field.floatV = iotdbRpcDataSet_->getFloatByIndex(columnIndex);
break;
case TSDataType::DOUBLE:
- field.doubleV = iotdbRpcDataSet_->getDouble(columnName);
+ field.doubleV =
iotdbRpcDataSet_->getDoubleByIndex(columnIndex);
break;
case TSDataType::TEXT:
case TSDataType::BLOB:
case TSDataType::STRING:
- field.stringV =
iotdbRpcDataSet_->getBinary(columnName)->getStringValue();
+ field.stringV =
iotdbRpcDataSet_->getBinaryByIndex(columnIndex)->getStringValue();
break;
default:
- throw new UnSupportedDataTypeException("Data type %s is not
supported." + dataType);
+ throw UnSupportedDataTypeException("Data type %s is not
supported." + dataType);
}
}
outFields.emplace_back(field);
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
index 822c464ddca..fc671b83926 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
@@ -40,8 +40,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX;
-
public class SessionDataSet implements ISessionDataSet {
private final IoTDBRpcDataSet ioTDBRpcDataSet;
@@ -176,56 +174,45 @@ public class SessionDataSet implements ISessionDataSet {
}
private RowRecord constructRowRecordFromValueArray() throws
StatementExecutionException {
- List<Field> outFields = new ArrayList<>();
- for (int i = 0; i < ioTDBRpcDataSet.columnSize; i++) {
+ int valueColumnStartIndex = ioTDBRpcDataSet.getValueColumnStartIndex();
+ int columnSize = ioTDBRpcDataSet.getColumnSize();
+ List<Field> outFields = new ArrayList<>(columnSize -
valueColumnStartIndex);
+ for (int columnIndex = valueColumnStartIndex + 1; columnIndex <=
columnSize; columnIndex++) {
Field field;
-
- int index = i + 1;
- int datasetColumnIndex = i + START_INDEX;
- if (ioTDBRpcDataSet.ignoreTimeStamp) {
- index--;
- datasetColumnIndex--;
- }
- int loc =
-
ioTDBRpcDataSet.columnOrdinalMap.get(ioTDBRpcDataSet.columnNameList.get(index))
- - START_INDEX;
-
- if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) {
- TSDataType dataType =
ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc);
+ if (!ioTDBRpcDataSet.isNull(columnIndex)) {
+ TSDataType dataType = ioTDBRpcDataSet.getDataType(columnIndex);
field = new Field(dataType);
switch (dataType) {
case BOOLEAN:
- boolean booleanValue =
ioTDBRpcDataSet.getBoolean(datasetColumnIndex);
+ boolean booleanValue = ioTDBRpcDataSet.getBoolean(columnIndex);
field.setBoolV(booleanValue);
break;
case INT32:
case DATE:
- int intValue = ioTDBRpcDataSet.getInt(datasetColumnIndex);
+ int intValue = ioTDBRpcDataSet.getInt(columnIndex);
field.setIntV(intValue);
break;
case INT64:
case TIMESTAMP:
- long longValue = ioTDBRpcDataSet.getLong(datasetColumnIndex);
+ long longValue = ioTDBRpcDataSet.getLong(columnIndex);
field.setLongV(longValue);
break;
case FLOAT:
- float floatValue = ioTDBRpcDataSet.getFloat(datasetColumnIndex);
+ float floatValue = ioTDBRpcDataSet.getFloat(columnIndex);
field.setFloatV(floatValue);
break;
case DOUBLE:
- double doubleValue = ioTDBRpcDataSet.getDouble(datasetColumnIndex);
+ double doubleValue = ioTDBRpcDataSet.getDouble(columnIndex);
field.setDoubleV(doubleValue);
break;
case TEXT:
case BLOB:
case STRING:
- field.setBinaryV(ioTDBRpcDataSet.getBinary(datasetColumnIndex));
+ field.setBinaryV(ioTDBRpcDataSet.getBinary(columnIndex));
break;
default:
throw new UnSupportedDataTypeException(
- String.format(
- "Data type %s is not supported.",
- ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i)));
+ String.format("Data type %s is not supported.", dataType));
}
} else {
field = new Field(null);
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
index 5bba6d0ea71..254c6d68598 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
@@ -127,20 +127,15 @@ public class IoTDBJDBCDataSet {
// deduplicate and map
if (columnNameIndex != null) {
- int deduplicatedColumnSize = (int)
columnNameIndex.values().stream().distinct().count();
- this.columnTypeDeduplicatedList = new
ArrayList<>(deduplicatedColumnSize);
- for (int i = 0; i < deduplicatedColumnSize; i++) {
- columnTypeDeduplicatedList.add(null);
- }
+ this.columnTypeDeduplicatedList =
+
initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex));
for (int i = 0; i < columnNameList.size(); i++) {
String name = columnNameList.get(i);
this.columnNameList.add(name);
this.columnTypeList.add(columnTypeList.get(i));
if (!columnOrdinalMap.containsKey(name)) {
int index = columnNameIndex.get(name);
- if (!columnOrdinalMap.containsValue(index + START_INDEX)) {
- columnTypeDeduplicatedList.set(index,
TSDataType.valueOf(columnTypeList.get(i)));
- }
+ setColumnTypeIfAbsent(columnTypeDeduplicatedList, index,
columnTypeList.get(i));
columnOrdinalMap.put(name, index + START_INDEX);
}
}
@@ -241,11 +236,8 @@ public class IoTDBJDBCDataSet {
// deduplicate and map
if (columnNameIndex != null) {
- int deduplicatedColumnSize = (int)
columnNameIndex.values().stream().distinct().count();
- this.columnTypeDeduplicatedList = new
ArrayList<>(deduplicatedColumnSize);
- for (int i = 0; i < deduplicatedColumnSize; i++) {
- columnTypeDeduplicatedList.add(null);
- }
+ this.columnTypeDeduplicatedList =
+
initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex));
for (int i = 0; i < columnNameList.size(); i++) {
String name = "";
if (sgList != null
@@ -261,9 +253,7 @@ public class IoTDBJDBCDataSet {
// "Time".equals(name) -> to allow the Time column appear in value
columns
if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) {
int index = columnNameIndex.get(name);
- if (!columnOrdinalMap.containsValue(index + START_INDEX)) {
- columnTypeDeduplicatedList.set(index,
TSDataType.valueOf(columnTypeList.get(i)));
- }
+ setColumnTypeIfAbsent(columnTypeDeduplicatedList, index,
columnTypeList.get(i));
columnOrdinalMap.put(name, index + START_INDEX);
}
}
@@ -318,6 +308,31 @@ public class IoTDBJDBCDataSet {
this.emptyResultSet = (queryDataSet == null ||
!queryDataSet.time.hasRemaining());
}
+ private static int getDeduplicatedColumnSize(Map<String, Integer>
columnNameIndex) {
+ int deduplicatedColumnSize = 0;
+ for (Integer index : columnNameIndex.values()) {
+ if (index != null && index + 1 > deduplicatedColumnSize) {
+ deduplicatedColumnSize = index + 1;
+ }
+ }
+ return deduplicatedColumnSize;
+ }
+
+ private static List<TSDataType> initDeduplicatedColumnTypes(int
deduplicatedColumnSize) {
+ List<TSDataType> columnTypes = new ArrayList<>(deduplicatedColumnSize);
+ for (int i = 0; i < deduplicatedColumnSize; i++) {
+ columnTypes.add(null);
+ }
+ return columnTypes;
+ }
+
+ private static void setColumnTypeIfAbsent(
+ List<TSDataType> columnTypeDeduplicatedList, int index, String
columnType) {
+ if (columnTypeDeduplicatedList.get(index) == null) {
+ columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnType));
+ }
+ }
+
public void close() throws StatementExecutionException, TException {
if (isClosed) {
return;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 8fc484fd00c..2ff01502fb3 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -634,4 +634,12 @@ public class IoTDBRpcDataSet {
throw new StatementExecutionException("No record remains");
}
}
+
+ public int getValueColumnStartIndex() {
+ return ignoreTimeStamp ? 0 : 1;
+ }
+
+ public int getColumnSize() {
+ return columnNameList.size();
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 461aef36245..c8d26f0cefe 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -788,13 +788,12 @@ public class Session implements ISession {
TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
request.setPrefixPath(prefixPath);
request.setMeasurements(measurements);
-
request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- request.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
- request.setMeasurementAlias(measurementAliasList);
- request.setTagsList(tagsList);
- request.setAttributesList(attributesList);
+ request.setDataTypes(toDataTypeOrdinals(dataTypes));
+ request.setEncodings(toEncodingOrdinals(encodings));
+ request.setCompressors(toCompressionOrdinals(compressors));
+ request.setMeasurementAlias(replaceNullStrings(measurementAliasList));
+ request.setTagsList(replaceNullMaps(tagsList));
+ request.setAttributesList(replaceNullMaps(attributesList));
return request;
}
@@ -834,29 +833,14 @@ public class Session implements ISession {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setPaths(paths);
-
- List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
- for (TSDataType dataType : dataTypes) {
- dataTypeOrdinals.add(dataType.ordinal());
- }
- request.setDataTypes(dataTypeOrdinals);
-
- List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
- for (TSEncoding encoding : encodings) {
- encodingOrdinals.add(encoding.ordinal());
- }
- request.setEncodings(encodingOrdinals);
-
- List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
- for (CompressionType compression : compressors) {
- compressionOrdinals.add((int) compression.serialize());
- }
- request.setCompressors(compressionOrdinals);
+ request.setDataTypes(toDataTypeOrdinals(dataTypes));
+ request.setEncodings(toEncodingOrdinals(encodings));
+ request.setCompressors(toCompressionOrdinals(compressors));
request.setPropsList(propsList);
- request.setTagsList(tagsList);
- request.setAttributesList(attributesList);
- request.setMeasurementAliasList(measurementAliasList);
+ request.setTagsList(replaceNullMaps(tagsList));
+ request.setAttributesList(replaceNullMaps(attributesList));
+ request.setMeasurementAliasList(replaceNullStrings(measurementAliasList));
return request;
}
@@ -1675,19 +1659,23 @@ public class Session implements ISession {
List<String> measurementsList,
List<TSDataType> types,
List<Object> valuesList) {
- Map<String, Object> nullMap = new HashMap<>();
+ Map<String, Object> nullMap = logger.isInfoEnabled() ? new HashMap<>() :
null;
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
- nullMap.put(measurementsList.get(i), valuesList.get(i));
+ if (nullMap != null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ }
valuesList.remove(i);
measurementsList.remove(i);
types.remove(i);
}
}
if (valuesList.isEmpty()) {
- logger.info("All values of the {} are null,null values are {}",
deviceId, nullMap);
+ if (nullMap != null) {
+ logger.info("All values of the {} are null,null values are {}",
deviceId, nullMap);
+ }
return true;
- } else {
+ } else if (nullMap != null) {
logger.info("Some values of {} are null,null values are {}", deviceId,
nullMap);
}
return false;
@@ -1733,18 +1721,22 @@ public class Session implements ISession {
*/
private boolean filterNullValueAndMeasurementWithStringType(
List<String> valuesList, String deviceId, List<String> measurementsList)
{
- Map<String, Object> nullMap = new HashMap<>();
+ Map<String, Object> nullMap = logger.isInfoEnabled() ? new HashMap<>() :
null;
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
- nullMap.put(measurementsList.get(i), valuesList.get(i));
+ if (nullMap != null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ }
valuesList.remove(i);
measurementsList.remove(i);
}
}
if (valuesList.isEmpty()) {
- logger.info("All values of the {} are null,null values are {}",
deviceId, nullMap);
+ if (nullMap != null) {
+ logger.info("All values of the {} are null,null values are {}",
deviceId, nullMap);
+ }
return true;
- } else {
+ } else if (nullMap != null) {
logger.info("Some values of {} are null,null values are {}", deviceId,
nullMap);
}
return false;
@@ -2481,7 +2473,57 @@ public class Session implements ISession {
* @return ordered list
*/
private static <T> List<T> sortList(List<T> source, Integer[] index) {
- return Arrays.stream(index).map(source::get).collect(Collectors.toList());
+ List<T> sortedList = new ArrayList<>(index.length);
+ for (int position : index) {
+ sortedList.add(source.get(position));
+ }
+ return sortedList;
+ }
+
+ private static List<Integer> toDataTypeOrdinals(List<TSDataType> dataTypes) {
+ List<Integer> ordinals = new ArrayList<>(dataTypes.size());
+ for (TSDataType dataType : dataTypes) {
+ ordinals.add(dataType.ordinal());
+ }
+ return ordinals;
+ }
+
+ private static List<Integer> toEncodingOrdinals(List<TSEncoding> encodings) {
+ List<Integer> ordinals = new ArrayList<>(encodings.size());
+ for (TSEncoding encoding : encodings) {
+ ordinals.add(encoding.ordinal());
+ }
+ return ordinals;
+ }
+
+ private static List<Integer> toCompressionOrdinals(List<CompressionType>
compressors) {
+ List<Integer> ordinals = new ArrayList<>(compressors.size());
+ for (CompressionType compression : compressors) {
+ ordinals.add((int) compression.serialize());
+ }
+ return ordinals;
+ }
+
+ private static List<String> replaceNullStrings(List<String> values) {
+ if (values == null) {
+ return null;
+ }
+ List<String> replacedValues = new ArrayList<>(values.size());
+ for (String value : values) {
+ replacedValues.add(value != null ? value : "");
+ }
+ return replacedValues;
+ }
+
+ private static List<Map<String, String>> replaceNullMaps(List<Map<String,
String>> values) {
+ if (values == null) {
+ return null;
+ }
+ List<Map<String, String>> replacedValues = new ArrayList<>(values.size());
+ for (Map<String, String> value : values) {
+ replacedValues.add(value != null ? value : new HashMap<>());
+ }
+ return replacedValues;
}
private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -3523,10 +3565,9 @@ public class Session implements ISession {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
-
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- req.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
+ req.setDataTypes(toDataTypeOrdinals(dataTypes));
+ req.setEncodings(toEncodingOrdinals(encodings));
+ req.setCompressors(toCompressionOrdinals(compressors));
req.setIsAligned(true);
defaultSessionConnection.appendSchemaTemplate(req);
}
@@ -3569,10 +3610,9 @@ public class Session implements ISession {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
-
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- req.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
+ req.setDataTypes(toDataTypeOrdinals(dataTypes));
+ req.setEncodings(toEncodingOrdinals(encodings));
+ req.setCompressors(toCompressionOrdinals(compressors));
req.setIsAligned(false);
defaultSessionConnection.appendSchemaTemplate(req);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
index ecbb2d3aea2..9bcb020b72f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
@@ -165,7 +165,7 @@ public class DeviceSchemaCache {
// the un-related paths being cleared, like "root.*.b.c.**" affects
// "root.*.d.c.**", thereby lower the query performance.
dualKeyCache.update(
- cachedDeviceID -> cachedDeviceID.matchFullPath(devicePath),
updateFunction);
+ cachedDeviceID -> devicePath.matchFullPath(cachedDeviceID),
updateFunction);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index 9b766b4c514..1e185e40cda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -55,6 +55,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -365,13 +366,15 @@ public class SchemaEngine {
return schemaRegionMap.size();
}
- public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer>
schemaIds) {
- Map<Integer, Long> deviceNum = new HashMap<>();
+ public Map<Integer, Long> countDeviceNumBySchemaRegion(final List<Integer>
schemaIds) {
+ final Map<Integer, Long> deviceNum = new HashMap<>();
+ final Collection<Integer> targetSchemaIds =
+ schemaIds.size() > 1 ? new HashSet<>(schemaIds) : schemaIds;
schemaRegionMap.entrySet().stream()
.filter(
entry ->
- schemaIds.contains(entry.getKey().getId())
+ targetSchemaIds.contains(entry.getKey().getId())
&&
SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()))
.forEach(
entry ->
@@ -381,12 +384,14 @@ public class SchemaEngine {
return deviceNum;
}
- public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer>
schemaIds) {
- Map<Integer, Long> timeSeriesNum = new HashMap<>();
+ public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(final
List<Integer> schemaIds) {
+ final Map<Integer, Long> timeSeriesNum = new HashMap<>();
+ final Collection<Integer> targetSchemaIds =
+ schemaIds.size() > 1 ? new HashSet<>(schemaIds) : schemaIds;
schemaRegionMap.entrySet().stream()
.filter(
entry ->
- schemaIds.contains(entry.getKey().getId())
+ targetSchemaIds.contains(entry.getKey().getId())
&&
SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()))
.forEach(
entry ->
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index c8439e02183..1c772cdb20b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -90,7 +90,9 @@ import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1038,12 +1040,14 @@ public class StorageEngine implements IService {
public void getDiskSizeByDataRegion(
Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
- dataRegionMap.forEach(
- (dataRegionId, dataRegion) -> {
- if (dataRegionIds.contains(dataRegionId.getId())) {
- dataRegionDisk.put(dataRegionId.getId(),
dataRegion.countRegionDiskSize());
- }
- });
+ final Collection<Integer> targetDataRegionIds =
+ dataRegionIds.size() > 1 ? new HashSet<>(dataRegionIds) :
dataRegionIds;
+ for (Integer dataRegionId : targetDataRegionIds) {
+ final DataRegion dataRegion = dataRegionMap.get(new
DataRegionId(dataRegionId));
+ if (dataRegion != null) {
+ dataRegionDisk.put(dataRegionId, dataRegion.countRegionDiskSize());
+ }
+ }
}
public static File getDataRegionSystemDir(String dataBaseName, String
dataRegionId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
index 055b8517b48..1552b7d81fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
@@ -53,11 +53,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class ClosedFileScanHandleImpl implements IFileScanHandle {
@@ -65,13 +65,16 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
private final TsFileResource tsFileResource;
private final QueryContext queryContext;
private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
curFileMods = null;
+ // Used to cache the device-level modifications
+ private final Map<IDeviceID, List<TimeRange>> deviceToDeletionRanges;
// Used to cache the modifications of each timeseries
private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToModifications;
public ClosedFileScanHandleImpl(TsFileResource tsFileResource, QueryContext
context) {
this.tsFileResource = tsFileResource;
this.queryContext = context;
- this.deviceToModifications = new HashMap<>();
+ this.deviceToDeletionRanges = new ConcurrentHashMap<>();
+ this.deviceToModifications = new ConcurrentHashMap<>();
}
@Override
@@ -89,14 +92,26 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
curFileMods != null
? curFileMods
: queryContext.loadAllModificationsFromDisk(tsFileResource);
- List<Modification> modifications =
queryContext.getPathModifications(curFileMods, deviceID);
- List<TimeRange> timeRangeList =
- modifications.stream()
- .filter(Deletion.class::isInstance)
- .map(Deletion.class::cast)
- .map(Deletion::getTimeRange)
- .collect(Collectors.toList());
- return ModificationUtils.isPointDeletedWithoutOrderedRange(timestamp,
timeRangeList);
+ List<TimeRange> timeRangeList = deviceToDeletionRanges.get(deviceID);
+ if (timeRangeList == null) {
+ final List<TimeRange> computedTimeRangeList =
+ getMergedTimeRanges(queryContext.getPathModifications(curFileMods,
deviceID));
+ final List<TimeRange> existingTimeRangeList =
+ deviceToDeletionRanges.putIfAbsent(deviceID, computedTimeRangeList);
+ timeRangeList = existingTimeRangeList == null ? computedTimeRangeList :
existingTimeRangeList;
+ }
+ return ModificationUtils.isPointDeleted(timestamp, timeRangeList);
+ }
+
+ private static List<TimeRange> getMergedTimeRanges(List<Modification>
modifications) {
+ List<TimeRange> timeRangeList = new ArrayList<>(modifications.size());
+ for (Modification modification : modifications) {
+ if (modification instanceof Deletion) {
+ timeRangeList.add(((Deletion) modification).getTimeRange());
+ }
+ }
+ TimeRange.sortAndMerge(timeRangeList);
+ return timeRangeList;
}
@Override
@@ -114,15 +129,9 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
: queryContext.loadAllModificationsFromDisk(tsFileResource);
List<Modification> modifications =
queryContext.getPathModifications(curFileMods, new
PartialPath(deviceID, timeSeriesName));
- List<TimeRange> timeRangeList =
- modifications.stream()
- .filter(Deletion.class::isInstance)
- .map(Deletion.class::cast)
- .map(Deletion::getTimeRange)
- .collect(Collectors.toList());
- TimeRange.sortAndMerge(timeRangeList);
+ List<TimeRange> timeRangeList = getMergedTimeRanges(modifications);
deviceToModifications
- .computeIfAbsent(deviceID, k -> new HashMap<>())
+ .computeIfAbsent(deviceID, k -> new ConcurrentHashMap<>())
.put(timeSeriesName, timeRangeList);
return ModificationUtils.isPointDeleted(timestamp, timeRangeList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
index 20ae0958a87..ef74c0e4336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator;
+import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -39,15 +40,19 @@ import org.apache.tsfile.read.common.TimeRange;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class UnclosedFileScanHandleImpl implements IFileScanHandle {
private final TsFileResource tsFileResource;
private final Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataMap;
private final Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap;
+ private final Map<IDeviceID, List<TimeRange>> deviceToDeletionRanges;
+ private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToTimeSeriesDeletionRanges;
public UnclosedFileScanHandleImpl(
Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataMap,
@@ -56,6 +61,8 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
this.deviceToChunkMetadataMap = deviceToChunkMetadataMap;
this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap;
this.tsFileResource = tsFileResource;
+ this.deviceToDeletionRanges = new ConcurrentHashMap<>();
+ this.deviceToTimeSeriesDeletionRanges = new ConcurrentHashMap<>();
}
@Override
@@ -68,19 +75,9 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timeArray) {
- Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
- for (List<IChunkMetadata> chunkMetadataList : chunkMetadataMap.values()) {
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (chunkMetadata.getDeleteIntervalList() != null) {
- for (TimeRange deleteInterval :
chunkMetadata.getDeleteIntervalList()) {
- if (deleteInterval.contains(timeArray)) {
- return true;
- }
- }
- }
- }
- }
- return false;
+ List<TimeRange> deletionRanges =
+ deviceToDeletionRanges.computeIfAbsent(deviceID,
this::collectDeviceDeletionRanges);
+ return ModificationUtils.isPointDeleted(timeArray, deletionRanges);
}
@Override
@@ -121,19 +118,13 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isTimeSeriesTimeDeleted(
IDeviceID deviceID, String timeSeriesName, long timestamp) {
- List<IChunkMetadata> chunkMetadataList =
- deviceToChunkMetadataMap.get(deviceID).get(timeSeriesName);
- // check if timestamp is deleted by deleteInterval
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (chunkMetadata.getDeleteIntervalList() != null) {
- for (TimeRange deleteInterval : chunkMetadata.getDeleteIntervalList())
{
- if (deleteInterval.contains(timestamp)) {
- return true;
- }
- }
- }
- }
- return false;
+ Map<String, List<TimeRange>> timeSeriesDeletionRanges =
+ deviceToTimeSeriesDeletionRanges.computeIfAbsent(
+ deviceID, key -> new ConcurrentHashMap<>());
+ List<TimeRange> deletionRanges =
+ timeSeriesDeletionRanges.computeIfAbsent(
+ timeSeriesName, key -> collectTimeSeriesDeletionRanges(deviceID,
key));
+ return ModificationUtils.isPointDeleted(timestamp, deletionRanges);
}
@Override
@@ -167,4 +158,44 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
public TsFileResource getTsResource() {
return tsFileResource;
}
+
+ private List<TimeRange> collectDeviceDeletionRanges(IDeviceID deviceID) {
+ Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
+ if (chunkMetadataMap == null || chunkMetadataMap.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<TimeRange> deletionRanges = new ArrayList<>();
+ for (List<IChunkMetadata> chunkMetadataList : chunkMetadataMap.values()) {
+ appendDeletionRanges(deletionRanges, chunkMetadataList);
+ }
+ TimeRange.sortAndMerge(deletionRanges);
+ return deletionRanges;
+ }
+
+ private List<TimeRange> collectTimeSeriesDeletionRanges(
+ IDeviceID deviceID, String timeSeriesName) {
+ Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
+ if (chunkMetadataMap == null) {
+ return Collections.emptyList();
+ }
+ List<IChunkMetadata> chunkMetadataList =
chunkMetadataMap.get(timeSeriesName);
+ if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<TimeRange> deletionRanges = new ArrayList<>();
+ appendDeletionRanges(deletionRanges, chunkMetadataList);
+ TimeRange.sortAndMerge(deletionRanges);
+ return deletionRanges;
+ }
+
+ private void appendDeletionRanges(
+ List<TimeRange> deletionRanges, List<IChunkMetadata> chunkMetadataList) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (chunkMetadata.getDeleteIntervalList() != null) {
+ for (TimeRange deletionRange : chunkMetadata.getDeleteIntervalList()) {
+ deletionRanges.add(new TimeRange(deletionRange.getMin(),
deletionRange.getMax()));
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 1c831534991..be7052536ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -191,14 +191,7 @@ public class TsFileManager {
public void remove(TsFileResource tsFileResource, boolean sequence) {
writeLock("remove");
try {
- Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles :
unsequenceFiles;
- for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet())
{
- if (entry.getValue().contains(tsFileResource)) {
- entry.getValue().remove(tsFileResource);
-
TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
- break;
- }
- }
+ removeFromPartitionFileList(tsFileResource, sequence);
} finally {
writeUnlock();
}
@@ -208,10 +201,18 @@ public class TsFileManager {
writeLock("removeAll");
try {
for (TsFileResource resource : tsFileResourceList) {
- remove(resource, sequence);
+ removeFromPartitionFileList(resource, sequence);
}
} finally {
- writeLock("removeAll");
+ writeUnlock();
+ }
+ }
+
+ private void removeFromPartitionFileList(TsFileResource tsFileResource,
boolean sequence) {
+ Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles :
unsequenceFiles;
+ TsFileResourceList tsFileResources =
selectedMap.get(tsFileResource.getTimePartition());
+ if (tsFileResources != null && tsFileResources.remove(tsFileResource)) {
+ TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index a9f28e24e0a..982ac2a3e22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -79,7 +79,11 @@ public class ModificationUtils {
if (range.contains(metaData.getStartTime(),
metaData.getEndTime())) {
return true;
} else {
- if (range.overlaps(new TimeRange(metaData.getStartTime(),
metaData.getEndTime()))) {
+ if (overlap(
+ metaData.getStartTime(),
+ metaData.getEndTime(),
+ range.getMin(),
+ range.getMax())) {
metaData.setModified(true);
}
}
@@ -135,9 +139,11 @@ public class ModificationUtils {
currentRemoved = true;
break;
} else {
- if (range.overlaps(
- new TimeRange(
- valueChunkMetadata.getStartTime(),
valueChunkMetadata.getEndTime()))) {
+ if (overlap(
+ valueChunkMetadata.getStartTime(),
+ valueChunkMetadata.getEndTime(),
+ range.getMin(),
+ range.getMax())) {
valueChunkMetadata.setModified(true);
modified = true;
}
@@ -209,6 +215,11 @@ public class ModificationUtils {
return isPointDeleted(timestamp, deletionList, deleteCursor);
}
+ // Both ranges are closed.
+ public static boolean overlap(long startA, long endA, long startB, long
endB) {
+ return endB >= startA && startB <= endA;
+ }
+
/**
* Check whether the device with start time and end time is completely
deleted by mods or not.
* There are some slight differences from that in {@link SettleSelectorImpl}.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index dec3864353c..90e405ebdea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -30,15 +30,14 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.write.UnSupportedDataTypeException;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -57,14 +56,13 @@ public class QueryDataSetUtils {
// indicate whether it is a null
int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new
DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new
ByteArrayOutputStream[columnNumWithTime];
+ PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ byteArrayOutputStreams[i] = new PublicBAOS();
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
int rowCount = 0;
- int[] valueOccupation = new int[columnNum];
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
@@ -77,14 +75,7 @@ public class QueryDataSetUtils {
TsBlock tsBlock = optionalTsBlock.get();
if (!tsBlock.isEmpty()) {
int currentCount = tsBlock.getPositionCount();
- serializeTsBlock(
- rowCount,
- currentCount,
- tsBlock,
- columnNum,
- dataOutputStreams,
- valueOccupation,
- bitmaps);
+ serializeTsBlock(rowCount, currentCount, tsBlock, columnNum,
dataOutputStreams, bitmaps);
rowCount += currentCount;
}
}
@@ -93,9 +84,9 @@ public class QueryDataSetUtils {
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- fillTimeColumn(rowCount, byteArrayOutputStreams, tsQueryDataSet);
+ fillTimeColumn(byteArrayOutputStreams, tsQueryDataSet);
- fillValueColumnsAndBitMaps(rowCount, byteArrayOutputStreams,
valueOccupation, tsQueryDataSet);
+ fillValueColumnsAndBitMaps(byteArrayOutputStreams, tsQueryDataSet);
return new Pair<>(tsQueryDataSet, finished);
}
@@ -109,14 +100,13 @@ public class QueryDataSetUtils {
int columnNum = 1;
int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new
DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new
ByteArrayOutputStream[columnNumWithTime];
+ PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ byteArrayOutputStreams[i] = new PublicBAOS();
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
int rowCount = 0;
- int[] valueOccupation = new int[columnNum];
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
@@ -150,7 +140,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -168,7 +157,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -185,7 +173,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -202,7 +189,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -219,7 +205,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[k] += 1;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -240,7 +225,6 @@ public class QueryDataSetUtils {
Binary binary = column.getBinary(i);
dataOutputStream.writeInt(binary.getLength());
dataOutputStream.write(binary.getValues());
- valueOccupation[k] = valueOccupation[k] + 4 +
binary.getLength();
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -268,27 +252,14 @@ public class QueryDataSetUtils {
}
// calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
+ tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0]));
// calculate the bitmap buffer size
- int bitmapOccupation = (rowCount + 7) / 8;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
+ List<ByteBuffer> bitmapList = new ArrayList<>(columnNum);
+ List<ByteBuffer> valueList = new ArrayList<>(columnNum);
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
+ valueList.add(wrapBuffer(byteArrayOutputStreams[i]));
+ bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1]));
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
@@ -301,7 +272,6 @@ public class QueryDataSetUtils {
TsBlock tsBlock,
int columnNum,
DataOutputStream[] dataOutputStreams,
- int[] valueOccupation,
int[] bitmaps)
throws IOException {
// serialize time column
@@ -321,67 +291,27 @@ public class QueryDataSetUtils {
switch (type) {
case INT32:
case DATE:
- doWithInt32Column(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithInt32Column(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case INT64:
case TIMESTAMP:
- doWithInt64Column(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithInt64Column(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case FLOAT:
- doWithFloatColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithFloatColumn(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case DOUBLE:
doWithDoubleColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case BOOLEAN:
doWithBooleanColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case TEXT:
case BLOB:
case STRING:
- doWithTextColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithTextColumn(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
default:
throw new UnSupportedDataTypeException(
@@ -396,7 +326,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -406,7 +335,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[columnIndex] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -422,7 +350,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -432,7 +359,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[columnIndex] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -448,7 +374,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -458,7 +383,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[columnIndex] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -474,7 +398,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -484,7 +407,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[columnIndex] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -500,7 +422,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -510,7 +431,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[columnIndex] += 1;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -526,7 +446,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -538,7 +457,6 @@ public class QueryDataSetUtils {
Binary binary = column.getBinary(i);
dataOutputStream.writeInt(binary.getLength());
dataOutputStream.write(binary.getValues());
- valueOccupation[columnIndex] = valueOccupation[columnIndex] + 4 +
binary.getLength();
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -562,40 +480,27 @@ public class QueryDataSetUtils {
}
private static void fillTimeColumn(
- int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams,
TSQueryDataSet tsQueryDataSet) {
- // calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
+ PublicBAOS[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) {
+ tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0]));
}
private static void fillValueColumnsAndBitMaps(
- int rowCount,
- ByteArrayOutputStream[] byteArrayOutputStreams,
- int[] valueOccupation,
- TSQueryDataSet tsQueryDataSet) {
- // calculate the bitmap buffer size
- int bitmapOccupation = (rowCount + 7) / 8;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
+ PublicBAOS[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) {
+ int columnNum = byteArrayOutputStreams.length / 2;
+ List<ByteBuffer> bitmapList = new ArrayList<>(columnNum);
+ List<ByteBuffer> valueList = new ArrayList<>(columnNum);
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
+ valueList.add(wrapBuffer(byteArrayOutputStreams[i]));
+ bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1]));
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
}
+ private static ByteBuffer wrapBuffer(PublicBAOS outputStream) {
+ return ByteBuffer.wrap(outputStream.getBuf(), 0, outputStream.size());
+ }
+
/**
* To fetch required amounts of data and combine them through List
*
@@ -654,10 +559,8 @@ public class QueryDataSetUtils {
for (int i = 0; i < columns; i++) {
boolean hasBitMap = BytesUtils.byteToBool(buffer.get());
if (hasBitMap) {
- byte[] bytes = new byte[size / Byte.SIZE + 1];
- for (int j = 0; j < bytes.length; j++) {
- bytes[j] = buffer.get();
- }
+ byte[] bytes = new byte[getBitmapByteSize(size)];
+ buffer.get(bytes);
bitMaps[i] = new BitMap(size, bytes);
}
}
@@ -673,16 +576,18 @@ public class QueryDataSetUtils {
for (int i = 0; i < columns; i++) {
boolean hasBitMap = BytesUtils.byteToBool(stream.readByte());
if (hasBitMap) {
- byte[] bytes = new byte[size / Byte.SIZE + 1];
- for (int j = 0; j < bytes.length; j++) {
- bytes[j] = stream.readByte();
- }
+ byte[] bytes = new byte[getBitmapByteSize(size)];
+ stream.readFully(bytes);
bitMaps[i] = new BitMap(size, bytes);
}
}
return Optional.of(bitMaps);
}
+ private static int getBitmapByteSize(int size) {
+ return size / Byte.SIZE + 1;
+ }
+
public static Object[] readTabletValuesFromBuffer(
ByteBuffer buffer, List<Integer> types, int columns, int size) {
TSDataType[] dataTypes = new TSDataType[types.size()];
@@ -850,11 +755,7 @@ public class QueryDataSetUtils {
for (int index = 0; index < size; index++) {
int binarySize = stream.readInt();
byte[] binaryValue = new byte[binarySize];
- int actualReadSize = stream.read(binaryValue);
- if (actualReadSize != binarySize) {
- throw new IllegalStateException(
- "Expect to read " + binarySize + " bytes, actually read " +
actualReadSize + "bytes.");
- }
+ stream.readFully(binaryValue);
binaryValues[index] = new Binary(binaryValue);
}
values[columnIndex] = binaryValues;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 144f2396ef7..bf15d216e67 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -267,6 +267,43 @@ public class DataNodeSchemaCacheTest {
Assert.assertEquals(0,
dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
}
+ @Test
+ public void testInvalidateLastCacheByWildcardDevicePath() throws
IllegalPathException {
+ final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32);
+ final PartialPath device0 = new PartialPath("root.sg1.d1");
+ final PartialPath device1 = new PartialPath("root.sg2.d1");
+ final MeasurementPath path0 = new
MeasurementPath(device0.concatNode("s0"), s0);
+ final MeasurementPath path1 = new
MeasurementPath(device1.concatNode("s0"), s0);
+ final TimeValuePair tv0 = new TimeValuePair(0L, new
TsPrimitiveType.TsInt(0));
+
+ updateLastCache("root.sg1", device0, path0, s0, tv0);
+ updateLastCache("root.sg2", device1, path1, s0, tv0);
+
+ Assert.assertEquals(tv0, dataNodeSchemaCache.getLastCache(path0));
+ Assert.assertEquals(tv0, dataNodeSchemaCache.getLastCache(path1));
+
+ dataNodeSchemaCache.invalidateLastCache(new
MeasurementPath("root.sg1.*.s0"));
+
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(path0));
+ Assert.assertEquals(tv0, dataNodeSchemaCache.getLastCache(path1));
+ }
+
+ private void updateLastCache(
+ final String database,
+ final PartialPath devicePath,
+ final MeasurementPath path,
+ final MeasurementSchema schema,
+ final TimeValuePair timeValuePair) {
+ dataNodeSchemaCache.declareLastCache(database, path);
+ dataNodeSchemaCache.updateLastCacheIfExists(
+ database,
+ devicePath,
+ new String[] {path.getMeasurement()},
+ new TimeValuePair[] {timeValuePair},
+ false,
+ new MeasurementSchema[] {schema});
+ }
+
@Test
public void testPut() throws Exception {
final ClusterSchemaTree clusterSchemaTree = new ClusterSchemaTree();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
index 31dda893358..4b4a5329f8a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
@@ -157,6 +157,9 @@ public class LimitOperatorTest {
int count = 0;
while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
TsBlock tsBlock = limitOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);