This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 181b3177774 [IOTDB-5879] Pipe: logical view of pipe event (#9855)
181b3177774 is described below
commit 181b317777484eba467906678d4870cd5e56bffd
Author: Itami Sho <[email protected]>
AuthorDate: Thu Jun 1 11:35:07 2023 +0800
[IOTDB-5879] Pipe: logical view of pipe event (#9855)
Co-authored-by: Steve Yurong Su <[email protected]>
Co-authored-by: yschengzi <[email protected]>
---
.../java/org/apache/iotdb/pipe/api/access/Row.java | 38 +-
.../event/dml/insertion/TabletInsertionEvent.java | 9 -
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 9 +
.../iotdb/commons/conf/CommonDescriptor.java | 14 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 4 +
.../builtin/processor/DoNothingProcessor.java | 13 +-
.../commons/pipe/utils/PipeBinaryTransformer.java | 19 +-
.../pipe/utils/PipeDataTypeTransformer.java | 94 +++++
.../PipeHistoricalDataRegionTsFileCollector.java | 15 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 39 +-
.../impl/iotdb/v1/IoTDBThriftReceiverV1.java | 59 ++-
.../connector/impl/iotdb/v1/PipeRequestType.java | 5 +-
.../iotdb/v1/request/PipeTransferTabletReq.java | 222 +++++++++++
.../iotdb/db/pipe/core/event/EnrichedEvent.java | 18 +-
...vent.java => PipeInsertNodeInsertionEvent.java} | 64 ++-
.../core/event/impl/PipeTabletInsertionEvent.java | 111 ++----
.../core/event/impl/PipeTsFileInsertionEvent.java | 30 +-
.../event/realtime/PipeRealtimeCollectEvent.java | 19 +-
.../realtime/PipeRealtimeCollectEventFactory.java | 6 +-
.../core/event/realtime/TsFileEpochManager.java | 12 +-
.../db/pipe/core/event/view/access/PipeRow.java | 96 +++--
.../core/event/view/access/PipeRowIterator.java | 60 ---
.../event/view/collector/PipeRowCollector.java | 45 ++-
.../TabletInsertionDataContainer.java | 430 +++++++++++++++++++++
.../TsFileInsertionDataContainer.java | 136 +++++++
.../TsFileInsertionDataTabletIterator.java | 287 ++++++++++++++
.../core/processor/PipeDoNothingProcessor.java | 44 ++-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 12 +-
.../collector/CachedSchemaPatternMatcherTest.java | 4 +-
.../apache/iotdb/tsfile/write/record/Tablet.java | 108 +++---
.../iotdb/tsfile/write/record/TabletTest.java | 14 +-
33 files changed, 1690 insertions(+), 352 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
index 54782637484..c27ed608e2e 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
@@ -34,7 +34,7 @@ public interface Row {
*
* @return timestamp
*/
- long getTime() throws IOException;
+ long getTime();
/**
* Returns the int value at the specified column in this row.
@@ -44,7 +44,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the int value at the specified column in this row
*/
- int getInt(int columnIndex) throws IOException;
+ int getInt(int columnIndex);
/**
* Returns the long value at the specified column in this row.
@@ -64,7 +64,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the float value at the specified column in this row
*/
- float getFloat(int columnIndex) throws IOException;
+ float getFloat(int columnIndex);
/**
* Returns the double value at the specified column in this row.
@@ -74,7 +74,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the double value at the specified column in this row
*/
- double getDouble(int columnIndex) throws IOException;
+ double getDouble(int columnIndex);
/**
* Returns the boolean value at the specified column in this row.
@@ -84,7 +84,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the boolean value at the specified column in this row
*/
- boolean getBoolean(int columnIndex) throws IOException;
+ boolean getBoolean(int columnIndex);
/**
* Returns the Binary value at the specified column in this row.
@@ -94,7 +94,7 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the Binary value at the specified column in this row
*/
- Binary getBinary(int columnIndex) throws IOException;
+ Binary getBinary(int columnIndex);
/**
* Returns the String value at the specified column in this row.
@@ -104,7 +104,15 @@ public interface Row {
* @param columnIndex index of the specified column
* @return the String value at the specified column in this row
*/
- String getString(int columnIndex) throws IOException;
+ String getString(int columnIndex);
+
+ /**
+ * Returns the Object value at the specified column in this row.
+ *
+ * @param columnIndex index of the specified column
+ * @return the Object value at the specified column in this row
+ */
+ Object getObject(int columnIndex);
/**
* Returns the actual data type of the value at the specified column in this
row.
@@ -123,9 +131,9 @@ public interface Row {
boolean isNull(int columnIndex);
/**
- * Returns the number of columns
+ * Returns the number of columns (excluding the timestamp column)
*
- * @return the number of columns
+ * @return the number of columns (excluding the timestamp column)
*/
int size();
@@ -139,16 +147,16 @@ public interface Row {
int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
/**
- * Returns the column names in the Row
+ * Returns the column data types in the Row
*
- * @return the column names in the Row
+ * @return the column data types in the Row
*/
- List<Path> getColumnNames();
+ List<Type> getColumnTypes();
/**
- * Returns the column data types in the Row
+ * Returns the device id of the Row
*
- * @return the column data types in the Row
+ * @return the device id of the Row
*/
- List<Type> getColumnTypes();
+ String getDeviceId();
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 9fd6d89428c..4a8073a5a26 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.write.record.Tablet;
-import java.util.Iterator;
import java.util.function.BiConsumer;
/** TabletInsertionEvent is used to define the event of data insertion. */
@@ -38,14 +37,6 @@ public interface TabletInsertionEvent extends Event {
*/
TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
- /**
- * The consumer processes the data by the Iterator and collects the results
by RowCollector.
- *
- * @return TabletInsertionEvent a new TabletInsertionEvent contains the
results collected by the
- * RowCollector
- */
- TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer);
-
/**
* The consumer processes the Tablet directly and collects the results by
RowCollector.
*
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 79871f857af..e3092d072fa 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -214,7 +214,9 @@ public enum TSStatusCode {
PIPE_TYPE_ERROR(1801),
PIPE_HANDSHAKE_ERROR(1802),
PIPE_TRANSFER_FILE_OFFSET_RESET(1803),
- PIPE_TRANSFER_FILE_ERROR(1804);
+ PIPE_TRANSFER_FILE_ERROR(1804),
+ PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR(1805),
+ ;
private final int statusCode;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e5a9fae2d8d..5c7c91545ee 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -154,6 +154,7 @@ public class CommonConfig {
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 1024;
+ private long pipeConnectorSessionId = Long.MAX_VALUE / 2;
private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -549,4 +550,12 @@ public class CommonConfig {
this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
+
+ public long getPipeConnectorSessionId() {
+ return pipeConnectorSessionId;
+ }
+
+ public void setPipeConnectorSessionId(long pipeSessionId) {
+ this.pipeConnectorSessionId = pipeSessionId;
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index bd72a56ae88..e45c4af45c5 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -238,13 +238,13 @@ public class CommonDescriptor {
String.valueOf(
config.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"pipe_subtask_executor_basic_check_point_interval_by_time_duration",
String.valueOf(
config.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration()))));
config.setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
@@ -276,7 +276,7 @@ public class CommonDescriptor {
"pipe_connector_read_file_buffer_size",
String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
config.setPipeConnectorRetryIntervalMs(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
@@ -285,6 +285,10 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+ config.setPipeConnectorSessionId(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_connector_session_id",
String.valueOf(config.getPipeConnectorSessionId()))));
config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
Integer.parseInt(
@@ -292,12 +296,12 @@ public class CommonDescriptor {
"pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
config.setPipeMetaSyncerInitialSyncDelayMinutes(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"pipe_meta_syncer_initial_sync_delay_minutes",
String.valueOf(config.getPipeMetaSyncerInitialSyncDelayMinutes()))));
config.setPipeMetaSyncerSyncIntervalMinutes(
- Integer.parseInt(
+ Long.parseLong(
properties.getProperty(
"pipe_meta_syncer_sync_interval_minutes",
String.valueOf(config.getPipeMetaSyncerSyncIntervalMinutes()))));
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 365ea0b8003..af0fcd379ff 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -85,6 +85,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
}
+ public long getPipeConnectorSessionId() {
+ return COMMON_CONFIG.getPipeConnectorSessionId();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index bc56a8bb3cf..d083783a716 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -30,38 +30,39 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
+/** This class is a placeholder and should not be used. */
public class DoNothingProcessor implements PipeProcessor {
@Override
public void validate(PipeParameterValidator validator) {
- // do nothing
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
public void customize(
PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
- // do nothing
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
throws IOException {
- eventCollector.collect(tabletInsertionEvent);
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
throws IOException {
- eventCollector.collect(tsFileInsertionEvent);
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
public void process(Event event, EventCollector eventCollector) throws
IOException {
- eventCollector.collect(event);
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
public void close() {
- // do nothing
+ throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
similarity index 60%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
index 687c3e72c14..125de0e9206 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
@@ -17,15 +17,16 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.view.collector;
+package org.apache.iotdb.commons.pipe.utils;
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
+public class PipeBinaryTransformer {
+ public static org.apache.iotdb.tsfile.utils.Binary transformToBinary(
+ org.apache.iotdb.pipe.api.type.Binary binary) {
+ return binary == null ? null : new
org.apache.iotdb.tsfile.utils.Binary(binary.getValues());
+ }
-import java.io.IOException;
-
-public class PipeRowCollector implements RowCollector {
-
- @Override
- public void collectRow(Row row) throws IOException {}
+ public static org.apache.iotdb.pipe.api.type.Binary transformToPipeBinary(
+ org.apache.iotdb.tsfile.utils.Binary binary) {
+ return binary == null ? null : new
org.apache.iotdb.pipe.api.type.Binary(binary.getValues());
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
new file mode 100644
index 00000000000..7e810f41c28
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.utils;
+
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transform between {@link
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType} and {@link
+ * org.apache.iotdb.pipe.api.type.Type}
+ */
+public class PipeDataTypeTransformer {
+ public static Type transformToPipeDataType(TSDataType tsDataType) {
+ return tsDataType == null ? null : getPipeDataType(tsDataType.getType());
+ }
+
+ public static List<Type> transformToPipeDataTypeList(List<TSDataType>
tsDataTypeList) {
+ return tsDataTypeList == null
+ ? null
+ : tsDataTypeList.stream()
+ .map(PipeDataTypeTransformer::transformToPipeDataType)
+ .collect(Collectors.toList());
+ }
+
+ private static Type getPipeDataType(byte type) {
+ switch (type) {
+ case 0:
+ return Type.BOOLEAN;
+ case 1:
+ return Type.INT32;
+ case 2:
+ return Type.INT64;
+ case 3:
+ return Type.FLOAT;
+ case 4:
+ return Type.DOUBLE;
+ case 5:
+ return Type.TEXT;
+ default:
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ }
+
+ public static TSDataType transformToTsDataType(Type type) {
+ return type == null ? null : getTsDataType(type.getType());
+ }
+
+ public static List<TSDataType> transformToTsDataTypeList(List<Type>
typeList) {
+ return typeList == null
+ ? null
+ : typeList.stream()
+ .map(PipeDataTypeTransformer::transformToTsDataType)
+ .collect(Collectors.toList());
+ }
+
+ private static TSDataType getTsDataType(byte type) {
+ switch (type) {
+ case 0:
+ return TSDataType.BOOLEAN;
+ case 1:
+ return TSDataType.INT32;
+ case 2:
+ return TSDataType.INT64;
+ case 3:
+ return TSDataType.FLOAT;
+ case 4:
+ return TSDataType.DOUBLE;
+ case 5:
+ return TSDataType.TEXT;
+ default:
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index c1157d96979..a1742f9fce3 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -152,7 +153,12 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
+ .map(
+ resource ->
+ new PipeTsFileInsertionEvent(
+ resource,
+ pipeTaskMeta,
+
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
@@ -161,7 +167,12 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
+ .map(
+ resource ->
+ new PipeTsFileInsertionEvent(
+ resource,
+ pipeTaskMeta,
+
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
.collect(Collectors.toList()));
pendingQueue.forEach(
event ->
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 3e264ce02e9..4b7c0cf8dbf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -55,7 +55,7 @@ public class PipeDataRegionAssigner {
collector -> {
final PipeRealtimeCollectEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- collector.getPipeTaskMeta());
+ collector.getPipeTaskMeta(), collector.getPattern());
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
collector.collect(copiedEvent);
});
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6f7db5bbf97..40c156e3285 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -32,6 +32,8 @@ import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.exception.WALPipeException;
@@ -113,15 +115,16 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
- // TODO: support more TabletInsertionEvent
// PipeProcessor can change the type of TabletInsertionEvent
- if (!(tabletInsertionEvent instanceof PipeTabletInsertionEvent)) {
- throw new NotImplementedException(
- "IoTDBThriftConnectorV1 only support PipeTabletInsertionEvent.");
- }
-
try {
- doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+ if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) {
+ doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) {
+ doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ throw new NotImplementedException(
+ "IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent
and PipeTabletInsertionEvent.");
+ }
} catch (TException e) {
LOGGER.error(
"Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
@@ -133,23 +136,37 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
}
}
- private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+ private void doTransfer(PipeInsertNodeInsertionEvent
pipeInsertNodeInsertionEvent)
throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
client.pipeTransfer(
-
PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
+ PipeTransferInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeInsertionEvent.getInsertNode()));
+
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Transfer PipeInsertNodeInsertionEvent %s error, result status
%s",
+ pipeInsertNodeInsertionEvent, resp.status));
+ }
+ }
+
+ private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+ throws PipeException, TException, IOException {
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+
PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer tablet insertion event %s error, result status %s",
+ "Transfer PipeTabletInsertionEvent %s error, result status %s",
pipeTabletInsertionEvent, resp.status));
}
}
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- // TODO: support more TsFileInsertionEvent
// PipeProcessor can change the type of TabletInsertionEvent
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
throw new NotImplementedException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index ea1f221d035..b7fcb594c20 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -69,6 +71,9 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
case TRANSFER_INSERT_NODE:
return handleTransferInsertNode(
PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
+ case TRANSFER_TABLET:
+ return handleTransferTablet(
+ PipeTransferTabletReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
case TRANSFER_FILE_PIECE:
return
handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
case TRANSFER_FILE_SEAL:
@@ -109,26 +114,13 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
executeStatement(req.constructStatement(), partitionFetcher,
schemaFetcher));
}
- private TSStatus executeStatement(
- Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
- final long queryId = SessionManager.getInstance().requestQueryId();
- final ExecutionResult result =
- Coordinator.getInstance()
- .execute(
- statement,
- queryId,
- null,
- "",
- partitionFetcher,
- schemaFetcher,
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "failed to execute statement, statement: {}, result status is: {}",
- statement,
- result.status);
- }
- return result.status;
+ private TPipeTransferResp handleTransferTablet(
+ PipeTransferTabletReq req, IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
+ InsertTabletStatement statement = req.constructStatement();
+ return new TPipeTransferResp(
+ statement.isEmpty()
+ ? RpcUtils.SUCCESS_STATUS
+ : executeStatement(statement, partitionFetcher, schemaFetcher));
}
private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq
req) {
@@ -258,6 +250,33 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
return writingFile != null && writingFile.exists() && writingFileWriter !=
null;
}
+ private TSStatus executeStatement(
+ Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
+ if (statement == null) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null
statement.");
+ }
+
+ final long queryId = SessionManager.getInstance().requestQueryId();
+ final ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "failed to execute statement, statement: {}, result status is: {}",
+ statement,
+ result.status);
+ }
+ return result.status;
+ }
+
@Override
public synchronized void handleExit() {
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
index 982243cd87e..c62034353d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
@@ -27,9 +27,10 @@ public enum PipeRequestType {
HANDSHAKE((short) 1),
TRANSFER_INSERT_NODE((short) 2),
+ TRANSFER_TABLET((short) 3),
- TRANSFER_FILE_PIECE((short) 3),
- TRANSFER_FILE_SEAL((short) 4),
+ TRANSFER_FILE_PIECE((short) 4),
+ TRANSFER_FILE_SEAL((short) 5),
;
private final short type;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
new file mode 100644
index 00000000000..ad54dd267e5
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class PipeTransferTabletReq extends TPipeTransferReq {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletReq.class);
+ private Tablet tablet;
+
+ public static TPipeTransferReq toTPipeTransferReq(Tablet tablet) throws
IOException {
+ final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+ tabletReq.tablet = tablet;
+
+ tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+ tabletReq.body = tablet.serialize();
+ return tabletReq;
+ }
+
+ private static boolean checkSorted(Tablet tablet) {
+ for (int i = 1; i < tablet.rowSize; i++) {
+ if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static void sortTablet(Tablet tablet) {
+ /*
+ * following part of code sort the batch data by time,
+ * so we can insert continuous data in value list to get a better
performance
+ */
+ // sort to get index, and use index to sort value list
+ Integer[] index = new Integer[tablet.rowSize];
+ for (int i = 0; i < tablet.rowSize; i++) {
+ index[i] = i;
+ }
+ Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
+ Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+ int columnIndex = 0;
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ IMeasurementSchema schema = tablet.getSchemas().get(i);
+ if (schema instanceof MeasurementSchema) {
+ tablet.values[columnIndex] = sortList(tablet.values[columnIndex],
schema.getType(), index);
+ if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+ tablet.bitMaps[columnIndex] =
sortBitMap(tablet.bitMaps[columnIndex], index);
+ }
+ columnIndex++;
+ } else {
+ int measurementSize = schema.getSubMeasurementsList().size();
+ for (int j = 0; j < measurementSize; j++) {
+ tablet.values[columnIndex] =
+ sortList(
+ tablet.values[columnIndex],
+ schema.getSubMeasurementsTSDataTypeList().get(j),
+ index);
+ if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+ tablet.bitMaps[columnIndex] =
sortBitMap(tablet.bitMaps[columnIndex], index);
+ }
+ columnIndex++;
+ }
+ }
+ }
+ }
+
+ /**
+ * sort value list by index
+ *
+ * @param valueList value list
+ * @param dataType data type
+ * @param index index
+ * @return sorted list
+ */
+ private static Object sortList(Object valueList, TSDataType dataType,
Integer[] index) {
+ switch (dataType) {
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) valueList;
+ boolean[] sortedValues = new boolean[boolValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedValues[i] = boolValues[index[i]];
+ }
+ return sortedValues;
+ case INT32:
+ int[] intValues = (int[]) valueList;
+ int[] sortedIntValues = new int[intValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedIntValues[i] = intValues[index[i]];
+ }
+ return sortedIntValues;
+ case INT64:
+ long[] longValues = (long[]) valueList;
+ long[] sortedLongValues = new long[longValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedLongValues[i] = longValues[index[i]];
+ }
+ return sortedLongValues;
+ case FLOAT:
+ float[] floatValues = (float[]) valueList;
+ float[] sortedFloatValues = new float[floatValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedFloatValues[i] = floatValues[index[i]];
+ }
+ return sortedFloatValues;
+ case DOUBLE:
+ double[] doubleValues = (double[]) valueList;
+ double[] sortedDoubleValues = new double[doubleValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedDoubleValues[i] = doubleValues[index[i]];
+ }
+ return sortedDoubleValues;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) valueList;
+ Binary[] sortedBinaryValues = new Binary[binaryValues.length];
+ for (int i = 0; i < index.length; i++) {
+ sortedBinaryValues[i] = binaryValues[index[i]];
+ }
+ return sortedBinaryValues;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+
+ /**
+ * sort BitMap by index
+ *
+ * @param bitMap BitMap to be sorted
+ * @param index index
+ * @return sorted bitMap
+ */
+ private static BitMap sortBitMap(BitMap bitMap, Integer[] index) {
+ BitMap sortedBitMap = new BitMap(bitMap.getSize());
+ for (int i = 0; i < index.length; i++) {
+ if (bitMap.isMarked(index[i])) {
+ sortedBitMap.mark(i);
+ }
+ }
+ return sortedBitMap;
+ }
+
+ public InsertTabletStatement constructStatement() {
+ if (!checkSorted(tablet)) {
+ sortTablet(tablet);
+ }
+
+ try {
+ final TSInsertTabletReq request = new TSInsertTabletReq();
+
+ for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+ request.addToMeasurements(measurementSchema.getMeasurementId());
+ request.addToTypes(measurementSchema.getType().ordinal());
+ }
+
+ request.setPrefixPath(tablet.deviceId);
+ request.setIsAligned(false);
+ request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+ request.setValues(SessionUtils.getValueBuffer(tablet));
+ request.setSize(tablet.rowSize);
+ request.setMeasurements(
+
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
+
+ return StatementGenerator.createStatement(request);
+ } catch (MetadataException e) {
+ LOGGER.warn(String.format("Generate Statement from tablet %s error.",
tablet), e);
+ return null;
+ }
+ }
+
+ public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
+ final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+ tabletReq.tablet = Tablet.deserialize(transferReq.body);
+
+ tabletReq.version = transferReq.version;
+ tabletReq.type = transferReq.type;
+ tabletReq.body = transferReq.body;
+
+ return tabletReq;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index 9fcc5fd1d90..e972104ce5d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,13 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger;
* additional information mainly includes the reference count of the event.
*/
public abstract class EnrichedEvent implements Event {
+
private final AtomicInteger referenceCount;
private final PipeTaskMeta pipeTaskMeta;
- public EnrichedEvent(PipeTaskMeta pipeTaskMeta) {
+ private final String pattern;
+
+ public EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
referenceCount = new AtomicInteger(0);
this.pipeTaskMeta = pipeTaskMeta;
+ this.pattern = pattern;
}
/**
@@ -114,8 +119,17 @@ public abstract class EnrichedEvent implements Event {
return referenceCount.get();
}
+ /**
+ * Get the pattern of this event.
+ *
+ * @return the pattern
+ */
+ public final String getPattern() {
+ return pattern == null ?
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+ }
+
public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta);
+ PipeTaskMeta pipeTaskMeta, String pattern);
public void reportException(PipeRuntimeException pipeRuntimeException) {
PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
similarity index 65%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
copy to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
index fb9788d1a95..3b980f10b9d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
@@ -23,34 +23,41 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
+public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class);
private final WALEntryHandler walEntryHandler;
private final ProgressIndex progressIndex;
- public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler,
ProgressIndex progressIndex) {
- this(walEntryHandler, progressIndex, null);
+ private TabletInsertionDataContainer dataContainer;
+
+ public PipeInsertNodeInsertionEvent(
+ WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
+ this(walEntryHandler, progressIndex, null, null);
}
- private PipeTabletInsertionEvent(
- WALEntryHandler walEntryHandler, ProgressIndex progressIndex,
PipeTaskMeta pipeTaskMeta) {
- super(pipeTaskMeta);
+ private PipeInsertNodeInsertionEvent(
+ WALEntryHandler walEntryHandler,
+ ProgressIndex progressIndex,
+ PipeTaskMeta pipeTaskMeta,
+ String pattern) {
+ super(pipeTaskMeta, pattern);
this.walEntryHandler = walEntryHandler;
this.progressIndex = progressIndex;
}
@@ -97,26 +104,49 @@ public class PipeTabletInsertionEvent extends
EnrichedEvent implements TabletIns
}
@Override
- public PipeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta) {
- return new PipeTabletInsertionEvent(walEntryHandler, progressIndex,
pipeTaskMeta);
+ public PipeInsertNodeInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ PipeTaskMeta pipeTaskMeta, String pattern) {
+ return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex,
pipeTaskMeta, pattern);
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
+ }
+ return dataContainer.processRowByRow(consumer);
+ } catch (Exception e) {
+ LOGGER.error("Process row by row error.", e);
+ throw new PipeException("Process row by row error.", e);
+ }
}
@Override
- public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ try {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
+ }
+ return dataContainer.processTablet(consumer);
+ } catch (Exception e) {
+ LOGGER.error("Process tablet error.", e);
+ throw new PipeException("Process tablet error.", e);
+ }
}
- @Override
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
+ public Tablet convertToTablet() {
+ try {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
+ }
+ return dataContainer.convertToTablet();
+ } catch (Exception e) {
+ LOGGER.error("Process tablet error.", e);
+ throw new PipeException("Process tablet error.", e);
+ }
}
/////////////////////////// Object ///////////////////////////
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index fb9788d1a95..399b091264d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,115 +19,58 @@
package org.apache.iotdb.db.pipe.core.event.impl;
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.wal.exception.WALPipeException;
-import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
+import java.util.Objects;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
-
- private final WALEntryHandler walEntryHandler;
- private final ProgressIndex progressIndex;
-
- public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler,
ProgressIndex progressIndex) {
- this(walEntryHandler, progressIndex, null);
- }
-
- private PipeTabletInsertionEvent(
- WALEntryHandler walEntryHandler, ProgressIndex progressIndex,
PipeTaskMeta pipeTaskMeta) {
- super(pipeTaskMeta);
- this.walEntryHandler = walEntryHandler;
- this.progressIndex = progressIndex;
- }
-
- public InsertNode getInsertNode() throws WALPipeException {
- return walEntryHandler.getValue();
- }
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
- /////////////////////////// EnrichedEvent ///////////////////////////
+ private final Tablet tablet;
+ private final String pattern;
- @Override
- public boolean increaseResourceReferenceCount(String holderMessage) {
- try {
- PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(),
walEntryHandler);
- return true;
- } catch (Exception e) {
- LOGGER.warn(
- String.format(
- "Increase reference count for memtable %d error. Holder Message:
%s",
- walEntryHandler.getMemTableId(), holderMessage),
- e);
- return false;
- }
- }
+ private TabletInsertionDataContainer dataContainer;
- @Override
- public boolean decreaseResourceReferenceCount(String holderMessage) {
- try {
- PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
- return true;
- } catch (Exception e) {
- LOGGER.warn(
- String.format(
- "Decrease reference count for memtable %d error. Holder Message:
%s",
- walEntryHandler.getMemTableId(), holderMessage),
- e);
- return false;
- }
+ public PipeTabletInsertionEvent(Tablet tablet) {
+ this(Objects.requireNonNull(tablet), null);
}
- @Override
- public ProgressIndex getProgressIndex() {
- return progressIndex;
+ public PipeTabletInsertionEvent(Tablet tablet, String pattern) {
+ this.tablet = Objects.requireNonNull(tablet);
+ this.pattern = pattern;
}
- @Override
- public PipeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta) {
- return new PipeTabletInsertionEvent(walEntryHandler, progressIndex,
pipeTaskMeta);
+ public String getPattern() {
+ return pattern == null ?
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ }
+ return dataContainer.processRowByRow(consumer);
}
@Override
public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ }
+ return dataContainer.processTablet(consumer);
}
- /////////////////////////// Object ///////////////////////////
-
- @Override
- public String toString() {
- return "PipeTabletInsertionEvent{"
- + "walEntryHandler="
- + walEntryHandler
- + ", progressIndex="
- + progressIndex
- + '}';
+ public Tablet convertToTablet() {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+ }
+ return dataContainer.convertToTablet();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 4b346b0f915..c813619be9a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -25,9 +25,11 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,12 +46,15 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final AtomicBoolean isClosed;
+ private TsFileInsertionDataContainer dataContainer;
+
public PipeTsFileInsertionEvent(TsFileResource resource) {
- this(resource, null);
+ this(resource, null, null);
}
- public PipeTsFileInsertionEvent(TsFileResource resource, PipeTaskMeta
pipeTaskMeta) {
- super(pipeTaskMeta);
+ public PipeTsFileInsertionEvent(
+ TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) {
+ super(pipeTaskMeta, pattern);
this.resource = resource;
tsFile = resource.getTsFile();
@@ -132,15 +137,28 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta) {
- return new PipeTsFileInsertionEvent(resource, pipeTaskMeta);
+ PipeTaskMeta pipeTaskMeta, String pattern) {
+ return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern);
}
/////////////////////////// TsFileInsertionEvent ///////////////////////////
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ if (dataContainer == null) {
+ waitForTsFileClose();
+ dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern());
+ }
+ return dataContainer.toTabletInsertionEvents();
+ } catch (InterruptedException e) {
+ String errorMsg =
+ String.format(
+ "Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
+ LOGGER.warn(errorMsg);
+ Thread.currentThread().interrupt();
+ throw new PipeException(errorMsg);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index f15ef605617..d91eb07a4b0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -38,11 +38,14 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent
{
private Map<String, String[]> device2Measurements;
public PipeRealtimeCollectEvent(
- EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
+ EnrichedEvent event,
+ TsFileEpoch tsFileEpoch,
+ Map<String, String[]> device2Measurements,
+ String pattern) {
// pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeCollectEvent
// is only used in the realtime event collector, which does not need to
report the progress
// of the event, so the pipeTaskMeta is always null.
- super(null);
+ super(null, pattern);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@@ -53,11 +56,12 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent
{
EnrichedEvent event,
TsFileEpoch tsFileEpoch,
Map<String, String[]> device2Measurements,
- PipeTaskMeta pipeTaskMeta) {
+ PipeTaskMeta pipeTaskMeta,
+ String pattern) {
// pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeCollectEvent
// is only used in the realtime event collector, which does not need to
report the progress
// of the event, so the pipeTaskMeta is always null.
- super(pipeTaskMeta);
+ super(pipeTaskMeta, pattern);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@@ -115,12 +119,13 @@ public class PipeRealtimeCollectEvent extends
EnrichedEvent {
@Override
public PipeRealtimeCollectEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- PipeTaskMeta pipeTaskMeta) {
+ PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeRealtimeCollectEvent(
-
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
+
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta,
pattern),
this.tsFileEpoch,
this.device2Measurements,
- pipeTaskMeta);
+ pipeTaskMeta,
+ pattern);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 937e3073a27..61d65cd43e8 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
@@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory {
public static PipeRealtimeCollectEvent createCollectEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
- return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
- new PipeTabletInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
+ return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent(
+ new PipeInsertNodeInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
insertNode,
resource);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index b7869778642..1dcc1fb9320 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.slf4j.Logger;
@@ -57,14 +57,16 @@ public class TsFileEpochManager {
// TsFileEpoch's life cycle
filePath2Epoch.remove(filePath),
resource.getDevices().stream()
- .collect(Collectors.toMap(device -> device, device ->
EMPTY_MEASUREMENT_ARRAY)));
+ .collect(Collectors.toMap(device -> device, device ->
EMPTY_MEASUREMENT_ARRAY)),
+ event.getPattern());
}
- public PipeRealtimeCollectEvent bindPipeTabletInsertionEvent(
- PipeTabletInsertionEvent event, InsertNode node, TsFileResource
resource) {
+ public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent(
+ PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource
resource) {
return new PipeRealtimeCollectEvent(
event,
filePath2Epoch.computeIfAbsent(resource.getTsFilePath(),
TsFileEpoch::new),
- Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()));
+ Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()),
+ event.getPattern());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 6c9a5c55f8c..043be5ec83f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -19,84 +19,130 @@
package org.apache.iotdb.db.pipe.core.event.view.access;
+import org.apache.iotdb.commons.pipe.utils.PipeDataTypeTransformer;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.iotdb.pipe.api.type.Binary;
import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
public class PipeRow implements Row {
+ private final int rowIndex;
+
+ private final String deviceId;
+ private final MeasurementSchema[] measurementSchemaList;
+
+ private final long[] timestampColumn;
+ private final Object[][] valueColumns;
+ private final TSDataType[] valueColumnTypes;
+
+ private final String[] columnNameStringList;
+
+ public PipeRow(
+ int rowIndex,
+ String deviceId,
+ MeasurementSchema[] measurementSchemaList,
+ long[] timestampColumn,
+ Object[][] valueColumns,
+ TSDataType[] valueColumnTypes,
+ String[] columnNameStringList) {
+ this.rowIndex = rowIndex;
+ this.deviceId = deviceId;
+ this.measurementSchemaList = measurementSchemaList;
+ this.timestampColumn = timestampColumn;
+ this.valueColumns = valueColumns;
+ this.valueColumnTypes = valueColumnTypes;
+ this.columnNameStringList = columnNameStringList;
+ }
+
+ @Override
+ public long getTime() {
+ return timestampColumn[rowIndex];
+ }
+
@Override
- public long getTime() throws IOException {
- return 0;
+ public int getInt(int columnIndex) {
+ return (int) valueColumns[columnIndex][rowIndex];
}
@Override
- public int getInt(int columnIndex) throws IOException {
- return 0;
+ public long getLong(int columnIndex) {
+ return (long) valueColumns[columnIndex][rowIndex];
}
@Override
- public long getLong(int columnIndex) throws IOException {
- return 0;
+ public float getFloat(int columnIndex) {
+ return (float) valueColumns[columnIndex][rowIndex];
}
@Override
- public float getFloat(int columnIndex) throws IOException {
- return 0;
+ public double getDouble(int columnIndex) {
+ return (double) valueColumns[columnIndex][rowIndex];
}
@Override
- public double getDouble(int columnIndex) throws IOException {
- return 0;
+ public boolean getBoolean(int columnIndex) {
+ return (boolean) valueColumns[columnIndex][rowIndex];
}
@Override
- public boolean getBoolean(int columnIndex) throws IOException {
- return false;
+ public Binary getBinary(int columnIndex) {
+ return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]);
}
@Override
- public Binary getBinary(int columnIndex) throws IOException {
- return null;
+ public String getString(int columnIndex) {
+ return (String) valueColumns[columnIndex][rowIndex];
}
@Override
- public String getString(int columnIndex) throws IOException {
- return null;
+ public Object getObject(int columnIndex) {
+ return valueColumns[columnIndex][rowIndex];
}
@Override
public Type getDataType(int columnIndex) {
- return null;
+ return
PipeDataTypeTransformer.transformToPipeDataType(valueColumnTypes[columnIndex]);
}
@Override
public boolean isNull(int columnIndex) {
- return false;
+ return valueColumns[columnIndex][rowIndex] == null;
}
@Override
public int size() {
- return 0;
+ return valueColumns.length;
}
@Override
public int getColumnIndex(Path columnName) throws
PipeParameterNotValidException {
- return 0;
+ for (int i = 0; i < columnNameStringList.length; i++) {
+ if (columnNameStringList[i].equals(columnName.getFullPath())) {
+ return i;
+ }
+ }
+ throw new PipeParameterNotValidException(
+ String.format("column %s not found", columnName.getFullPath()));
}
@Override
- public List<Path> getColumnNames() {
- return null;
+ public List<Type> getColumnTypes() {
+ return
PipeDataTypeTransformer.transformToPipeDataTypeList(Arrays.asList(valueColumnTypes));
}
@Override
- public List<Type> getColumnTypes() {
- return null;
+ public String getDeviceId() {
+ return deviceId;
+ }
+
+ public MeasurementSchema[] getMeasurementSchemaList() {
+ return measurementSchemaList;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
deleted file mode 100644
index 73ee4d041a4..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.view.access;
-
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.access.RowIterator;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Type;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-import java.io.IOException;
-import java.util.List;
-
-public class PipeRowIterator implements RowIterator {
-
- @Override
- public boolean hasNextRow() {
- return false;
- }
-
- @Override
- public Row next() throws IOException {
- return null;
- }
-
- @Override
- public void reset() {}
-
- @Override
- public int getColumnIndex(Path columnName) throws
PipeParameterNotValidException {
- return 0;
- }
-
- @Override
- public List<Path> getColumnNames() {
- return null;
- }
-
- @Override
- public List<Type> getColumnTypes() {
- return null;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 687c3e72c14..3d39e08c32a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,13 +19,54 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
public class PipeRowCollector implements RowCollector {
+ private Tablet tablet = null;
+
@Override
- public void collectRow(Row row) throws IOException {}
+ public void collectRow(Row row) {
+ if (!(row instanceof PipeRow)) {
+ throw new PipeException("Row can not be customized");
+ }
+
+ final PipeRow pipeRow = (PipeRow) row;
+ final MeasurementSchema[] measurementSchemaArray =
pipeRow.getMeasurementSchemaList();
+
+ if (tablet == null) {
+ final String deviceId = pipeRow.getDeviceId();
+ final List<MeasurementSchema> measurementSchemaList =
+ new ArrayList<>(Arrays.asList(measurementSchemaArray));
+ tablet = new Tablet(deviceId, measurementSchemaList);
+ tablet.initBitMaps();
+ }
+
+ final int rowIndex = tablet.rowSize;
+ tablet.addTimestamp(rowIndex, row.getTime());
+ for (int i = 0; i < row.size(); i++) {
+ tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex,
row.getObject(i));
+ if (row.isNull(i)) {
+ tablet.bitMaps[i].mark(rowIndex);
+ }
+ }
+ tablet.rowSize++;
+ }
+
+ public TabletInsertionEvent toTabletInsertionEvent() {
+ PipeTabletInsertionEvent tabletInsertionEvent = new
PipeTabletInsertionEvent(tablet);
+ this.tablet = null;
+ return tabletInsertionEvent;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
new file mode 100644
index 00000000000..f61d4f623ad
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
+import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.stream.IntStream;
+
+public class TabletInsertionDataContainer {
+
+ private String deviceId;
+ private MeasurementSchema[] measurementSchemaList;
+ private String[] columnNameStringList;
+
+ private long[] timestampColumn;
+ private Object[][] valueColumns;
+ private TSDataType[] valueColumnTypes;
+ private BitMap[] nullValueColumnBitmaps;
+ private int rowCount;
+
+ private Tablet tablet;
+
+ public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+ try {
+ if (insertNode instanceof InsertRowNode) {
+ parse((InsertRowNode) insertNode, pattern);
+ } else if (insertNode instanceof InsertTabletNode) {
+ parse((InsertTabletNode) insertNode, pattern);
+ } else {
+ throw new UnSupportedDataTypeException(
+ String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
+ }
+ } catch (IllegalPathException e) {
+ throw new PipeException(
+ String.format("Failed to parse insertNode with pattern %s.",
pattern), e);
+ }
+ }
+
+ public TabletInsertionDataContainer(Tablet tablet, String pattern) {
+ parse(tablet, pattern);
+ }
+
+ //////////////////////////// parse ////////////////////////////
+
+ private void parse(InsertRowNode insertRowNode, String pattern) throws
IllegalPathException {
+ final int originColumnSize = insertRowNode.getMeasurements().length;
+ final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
+
+ this.deviceId = insertRowNode.getDevicePath().getFullPath();
+ this.timestampColumn = new long[] {insertRowNode.getTime()};
+
+ generateColumnIndexMapper(
+ insertRowNode, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ final int filteredColumnSize =
+ Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+ .filter(Objects::nonNull)
+ .toArray()
+ .length;
+
+ this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+ this.columnNameStringList = new String[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize][1];
+ this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+ final MeasurementSchema[] originMeasurementSchemaList =
insertRowNode.getMeasurementSchemas();
+ final String[] originColumnNameStringList =
insertRowNode.getMeasurements();
+ final Object[] originValueColumns = insertRowNode.getValues();
+ final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes();
+
+ for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+ if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+ final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
+ this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList[i];
+ this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
+ this.valueColumns[filteredColumnIndex][0] = originValueColumns[i];
+ this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
+ }
+ }
+
+ rowCount = 1;
+ }
+
+ private void parse(InsertTabletNode insertTabletNode, String pattern)
+ throws IllegalPathException {
+ final int originColumnSize = insertTabletNode.getMeasurements().length;
+ final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
+
+ this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+ this.timestampColumn = insertTabletNode.getTimes();
+
+ generateColumnIndexMapper(
+ insertTabletNode, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+
+ final int filteredColumnSize =
+ Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+ .filter(Objects::nonNull)
+ .toArray()
+ .length;
+
+ this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+ this.columnNameStringList = new String[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize][];
+ this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+ final MeasurementSchema[] originMeasurementSchemaList =
+ insertTabletNode.getMeasurementSchemas();
+ final String[] originColumnNameStringList =
insertTabletNode.getMeasurements();
+ final Object[] originValueColumns = insertTabletNode.getColumns();
+ final TSDataType[] originValueColumnTypes =
insertTabletNode.getDataTypes();
+ final BitMap[] originBitMapList =
+ (insertTabletNode.getBitMaps() == null
+ ? IntStream.range(0, originColumnSize)
+ .boxed()
+ .map(o -> new BitMap(timestampColumn.length))
+ .toArray(BitMap[]::new)
+ : insertTabletNode.getBitMaps());
+
+ for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+ if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+ final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
+ this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList[i];
+ this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
+ this.valueColumns[filteredColumnIndex] =
+ convertToColumn(originValueColumns[i], originValueColumnTypes[i],
originBitMapList[i]);
+ this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+ }
+ }
+
+ rowCount = timestampColumn.length;
+ }
+
+ private void parse(Tablet tablet, String pattern) {
+ final int originColumnSize = tablet.getSchemas().size();
+ final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
+
+ this.deviceId = tablet.deviceId;
+ this.timestampColumn = tablet.timestamps;
+
+ generateColumnIndexMapper(tablet, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+
+ final int filteredColumnSize =
+ Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+ .filter(Objects::nonNull)
+ .toArray()
+ .length;
+
+ this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+ this.columnNameStringList = new String[filteredColumnSize];
+ this.valueColumns = new Object[filteredColumnSize][];
+ this.valueColumnTypes = new TSDataType[filteredColumnSize];
+ this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+ final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
+ final String[] originColumnNameStringList = new String[originColumnSize];
+ final TSDataType[] originValueColumnTypes = new
TSDataType[originColumnSize];
+ for (int i = 0; i < originColumnSize; i++) {
+ originColumnNameStringList[i] =
originMeasurementSchemaList.get(i).getMeasurementId();
+ originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
+ }
+ final Object[] originValueColumns = tablet.values;
+ final BitMap[] originBitMapList = tablet.bitMaps;
+
+ for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+ if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+ final int filteredColumnIndex =
originColumnIndex2FilteredColumnIndexMapperList[i];
+ this.measurementSchemaList[filteredColumnIndex] =
originMeasurementSchemaList.get(i);
+ this.columnNameStringList[filteredColumnIndex] =
originColumnNameStringList[i];
+ this.valueColumns[filteredColumnIndex] =
+ convertToColumn(originValueColumns[i], originValueColumnTypes[i],
originBitMapList[i]);
+ this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+ this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+ }
+ }
+
+ rowCount = tablet.rowSize;
+ }
+
+ private void generateColumnIndexMapper(
+ String[] originMeasurementList,
+ String pattern,
+ Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
+ final int originColumnSize = originMeasurementList.length;
+
+ // case 1: for example, pattern is root.a.b or pattern is null and device
is root.a.b.c
+ // in this case, all data can be matched without checking the measurements
+ if (pattern == null || pattern.length() <= deviceId.length() &&
deviceId.startsWith(pattern)) {
+ for (int i = 0; i < originColumnSize; i++) {
+ originColumnIndex2FilteredColumnIndexMapperList[i] = i;
+ }
+ }
+
+ // case 2: for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, we need to check the full path
+ else if (pattern.length() > deviceId.length() &&
pattern.startsWith(deviceId)) {
+ int filteredCount = 0;
+
+ for (int i = 0; i < originColumnSize; i++) {
+ final String measurement = originMeasurementList[i];
+
+ // low cost check comes first
+ if (pattern.length() == deviceId.length() + measurement.length() + 1
+ // high cost check comes later
+ && pattern.startsWith(deviceId)
+ && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
+ originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
+ }
+ }
+ }
+ }
+
+ private void generateColumnIndexMapper(
+ InsertNode insertNode,
+ String pattern,
+ Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
+ generateColumnIndexMapper(
+ insertNode.getMeasurements(), pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ }
+
+ private void generateColumnIndexMapper(
+ Tablet tablet, String pattern, Integer[]
originColumnIndex2FilteredColumnIndexMapperList) {
+ final List<MeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
+ final String[] originMeasurementList = new
String[originMeasurementSchemaList.size()];
+ for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
+ originMeasurementList[i] =
originMeasurementSchemaList.get(i).getMeasurementId();
+ }
+ generateColumnIndexMapper(
+ originMeasurementList, pattern,
originColumnIndex2FilteredColumnIndexMapperList);
+ }
+
+ private Object[] convertToColumn(Object originColumn, TSDataType dataType,
BitMap bitMap) {
+ switch (dataType) {
+ case INT32:
+ final int[] intValues = (int[]) originColumn;
+ final Integer[] integerValues = new Integer[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null :
intValues[i];
+ }
+ return integerValues;
+ case INT64:
+ final long[] longValues = (long[]) originColumn;
+ final Long[] longValues2 = new Long[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
longValues[i];
+ }
+ return longValues2;
+ case FLOAT:
+ final float[] floatValues = (float[]) originColumn;
+ final Float[] floatValues2 = new Float[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
floatValues[i];
+ }
+ return floatValues2;
+ case DOUBLE:
+ final double[] doubleValues = (double[]) originColumn;
+ final Double[] doubleValues2 = new Double[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
doubleValues[i];
+ }
+ return doubleValues2;
+ case BOOLEAN:
+ final boolean[] booleanValues = (boolean[]) originColumn;
+ final Boolean[] booleanValues2 = new Boolean[booleanValues.length];
+ for (int i = 0; i < booleanValues.length; i++) {
+ booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null :
booleanValues[i];
+ }
+ return booleanValues2;
+ case TEXT:
+ final Binary[] binaryValues = (Binary[]) originColumn;
+ final String[] stringValues = new String[binaryValues.length];
+ for (int i = 0; i < binaryValues.length; i++) {
+ stringValues[i] =
+ bitMap != null && bitMap.isMarked(i)
+ ? null
+ : (binaryValues[i] == null ? null :
binaryValues[i].getStringValue());
+ }
+ return stringValues;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+
+ //////////////////////////// process ////////////////////////////
+
+ public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
+ final PipeRowCollector rowCollector = new PipeRowCollector();
+ for (int i = 0; i < timestampColumn.length; i++) {
+ consumer.accept(
+ new PipeRow(
+ i,
+ deviceId,
+ measurementSchemaList,
+ timestampColumn,
+ valueColumns,
+ valueColumnTypes,
+ columnNameStringList),
+ rowCollector);
+ }
+ return rowCollector.toTabletInsertionEvent();
+ }
+
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ final PipeRowCollector rowCollector = new PipeRowCollector();
+ consumer.accept(convertToTablet(), rowCollector);
+ return rowCollector.toTabletInsertionEvent();
+ }
+
+ //////////////////////////// convert ////////////////////////////
+
+ public Tablet convertToTablet() {
+ if (tablet != null) {
+ return tablet;
+ }
+
+ final int columnSize = measurementSchemaList.length;
+ final int rowSize = valueColumns[0].length;
+
+ final List<MeasurementSchema> measurementSchemaArrayList =
+ new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0,
columnSize));
+
+ final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList,
rowSize);
+ newTablet.timestamps = timestampColumn;
+ newTablet.bitMaps = nullValueColumnBitmaps;
+ newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes);
+ newTablet.rowSize = rowCount;
+
+ tablet = newTablet;
+ return tablet;
+ }
+
+ private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[]
valueColumnTypes) {
+ final Object[] values = new Object[valueColumns.length];
+ for (int i = 0; i < valueColumns.length; i++) {
+ values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]);
+ }
+ return values;
+ }
+
+ private Object squashFromColumn(Object[] valueColumn, TSDataType
valueColumnType) {
+ switch (valueColumnType) {
+ case INT32:
+ final Integer[] intValues = (Integer[]) valueColumn;
+ final int[] intValues2 = new int[intValues.length];
+ for (int i = 0; i < intValues.length; i++) {
+ intValues2[i] = intValues[i] == null ? 0 : intValues[i];
+ }
+ return intValues2;
+ case INT64:
+ final Long[] longValues = (Long[]) valueColumn;
+ final long[] longValues2 = new long[longValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ longValues2[i] = longValues[i] == null ? 0 : longValues[i];
+ }
+ return longValues2;
+ case FLOAT:
+ final Float[] floatValues = (Float[]) valueColumn;
+ final float[] floatValues2 = new float[floatValues.length];
+ for (int i = 0; i < floatValues.length; i++) {
+ floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i];
+ }
+ return floatValues2;
+ case DOUBLE:
+ final Double[] doubleValues = (Double[]) valueColumn;
+ final double[] doubleValues2 = new double[doubleValues.length];
+ for (int i = 0; i < doubleValues.length; i++) {
+ doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i];
+ }
+ return doubleValues2;
+ case BOOLEAN:
+ final Boolean[] booleanValues = (Boolean[]) valueColumn;
+ final boolean[] booleanValues2 = new boolean[booleanValues.length];
+ for (int i = 0; i < booleanValues.length; i++) {
+ booleanValues2[i] = booleanValues[i] != null && booleanValues[i];
+ }
+ return booleanValues2;
+ case TEXT:
+ final String[] stringValues = (String[]) valueColumn;
+ final Binary[] binaryValues = new Binary[stringValues.length];
+ for (int i = 0; i < stringValues.length; i++) {
+ binaryValues[i] = stringValues[i] == null ? null :
Binary.valueOf(stringValues[i]);
+ }
+ return binaryValues;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", valueColumnType));
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
new file mode 100644
index 00000000000..260814d609f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileInsertionDataContainer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+
+ private final File tsFile;
+ private final String pattern;
+
+ private TimeseriesMetadata vectorTimeseriesMetadata;
+
+ private final Map<String, List<TimeseriesMetadata>>
device2TimeseriesMetadataMap;
+
+ public TsFileInsertionDataContainer(File tsFile, String pattern) {
+ this.tsFile = tsFile;
+ this.pattern = pattern;
+
+ this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap();
+ }
+
+ private Map<String, List<TimeseriesMetadata>>
collectDevice2TimeseriesMetadataMap() {
+ final Map<String, List<TimeseriesMetadata>> result = new HashMap<>();
+
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getPath())) {
+ // match pattern
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry :
+ reader.getAllTimeseriesMetadata(true).entrySet()) {
+ final String device = entry.getKey();
+ boolean isVector = false;
+
+ // case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
+ // in this case, all data can be matched without checking the
measurements
+ if (pattern == null || pattern.length() <= device.length() &&
device.startsWith(pattern)) {
+ result.put(device, entry.getValue());
+ }
+
+ // case 2: for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, we need to check the full path
+ else {
+ final List<TimeseriesMetadata> timeseriesMetadataList = new
ArrayList<>();
+
+ for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+ // TODO: test me!!!
+ if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+ vectorTimeseriesMetadata = timeseriesMetadata;
+ isVector = false;
+ continue;
+ }
+
+ final String measurement = timeseriesMetadata.getMeasurementId();
+ // low cost check comes first
+ if (pattern.length() == measurement.length() + device.length() + 1
+ // high cost check comes later
+ && pattern.startsWith(device)
+ && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
+ if (!isVector) {
+ isVector = true;
+ timeseriesMetadataList.add(vectorTimeseriesMetadata);
+ }
+ timeseriesMetadataList.add(timeseriesMetadata);
+ }
+ }
+
+ if (!timeseriesMetadataList.isEmpty()) {
+ result.put(device, timeseriesMetadataList);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e);
+ }
+
+ return result;
+ }
+
+ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ return () ->
+ new Iterator<TabletInsertionEvent>() {
+
+ private final Iterator<Tablet> tabletIterator =
constructTabletIterable().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return tabletIterator.hasNext();
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ return new PipeTabletInsertionEvent(tabletIterator.next());
+ }
+ };
+ }
+
+ private Iterable<Tablet> constructTabletIterable() {
+ return () ->
+ new TsFileInsertionDataTabletIterator(tsFile.getPath(),
device2TimeseriesMetadataMap);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
new file mode 100644
index 00000000000..7100e68b753
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
+
+ private static Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class);
+ private final TsFileSequenceReader reader;
+ private final String filePath;
+ private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>>
entriesIterator;
+ private Map.Entry<String, List<TimeseriesMetadata>> currentEntry;
+ private Iterator<TimeseriesMetadata> timeseriesMetadataIterator;
+ private TimeseriesMetadata currentTimeseriesMetadata;
+ private List<MeasurementSchema> measurementSchemas;
+
+ private boolean isAligned;
+ private final List<long[]> timeBatches;
+ private long[] timestampsForAligned;
+
+ public TsFileInsertionDataTabletIterator(
+ String filePath, Map<String, List<TimeseriesMetadata>>
device2TimeseriesMetadataMap) {
+ this.filePath = filePath;
+ this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator();
+ this.timeBatches = new ArrayList<>();
+ this.currentEntry = null;
+ this.timeseriesMetadataIterator = null;
+ this.currentTimeseriesMetadata = null;
+ this.measurementSchemas = null;
+ this.isAligned = false;
+ this.timestampsForAligned = null;
+ try {
+ this.reader = new TsFileSequenceReader(filePath);
+ } catch (IOException e) {
+ throw new PipeException("Cannot create TsFileSequenceReader for file " +
filePath, e);
+ }
+
+ // Initialize timeseriesMetadataIterator if there is a next entry
+ if (entriesIterator.hasNext()) {
+ currentEntry = entriesIterator.next();
+ timeseriesMetadataIterator = currentEntry.getValue().iterator();
+ } else {
+ timeseriesMetadataIterator =
+ new Iterator<TimeseriesMetadata>() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public TimeseriesMetadata next() {
+ return null;
+ }
+ };
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = timeseriesMetadataIterator.hasNext() ||
entriesIterator.hasNext();
+ if (!hasNext) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath,
e);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public Tablet next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ if (!timeseriesMetadataIterator.hasNext()) {
+ currentEntry = entriesIterator.next();
+ timeseriesMetadataIterator = currentEntry.getValue().iterator();
+ }
+ currentTimeseriesMetadata = timeseriesMetadataIterator.next();
+ measurementSchemas = new ArrayList<>();
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+ processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+ currentTimeseriesMetadata = timeseriesMetadataIterator.next();
+ }
+ return processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private Tablet createTablet(long[] timestamps, Object[] values, BitMap[]
bitMaps) {
+ long[] tmp;
+
+ if (isAligned) {
+ if (timestampsForAligned == null) {
+ timestampsForAligned = timestamps;
+ return null;
+ }
+ tmp = timestampsForAligned;
+ } else {
+ tmp = timestamps;
+ }
+
+ // create tablet
+ int rowSize = tmp.length;
+ Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas,
rowSize);
+ tablet.timestamps = tmp;
+ tablet.values = values;
+ tablet.rowSize = rowSize;
+ tablet.bitMaps = bitMaps;
+
+ return tablet;
+ }
+
+ private Tablet processTimeseriesMetadata(
+ TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) {
+ int pageIndex = 0;
+ if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+ isAligned = true;
+ timeBatches.clear();
+ } else {
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTSDataType());
+ measurementSchemas.add(measurementSchema);
+ }
+
+ List<Byte> bitMapBytes = new ArrayList<>();
+ List<Object> measurementValues = new ArrayList<>();
+ List<Long> measurementTimestamps = new ArrayList<>();
+
+ for (IChunkMetadata chunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
+ long offset = chunkMetadata.getOffsetOfChunkHeader();
+ try {
+ reader.position(offset);
+ ChunkHeader header = reader.readChunkHeader(reader.readMarker());
+ int dataSize = header.getDataSize();
+
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatches.clear();
+ }
+
+ while (dataSize > 0) {
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(), (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
+
+ // Time column chunk
+ if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) {
+ TimePageReader timePageReader =
+ new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+ long[] timeBatch = timePageReader.getNextTimeBatch();
+ timeBatches.add(timeBatch);
+
+ for (long time : timeBatch) {
+ measurementTimestamps.add(time);
+ }
+ }
+ // Value column chunk
+ else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
+ == TsFileConstant.VALUE_COLUMN_MASK) {
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData,
header.getDataType(), valueDecoder);
+
+ for (byte value : valuePageReader.getBitmap()) {
+ bitMapBytes.add(value);
+ }
+
+ for (TsPrimitiveType value :
+ valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) {
+ measurementValues.add(value.getValue());
+ }
+ }
+
+ // NonAligned Chunk
+ else {
+ PageReader pageReader =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ List<Integer> isNullList = new ArrayList<>();
+ int index = 0;
+ while (batchData.hasCurrent()) {
+ measurementTimestamps.add(batchData.currentTime());
+ Object value = batchData.currentValue();
+
+ if (value == null) {
+ isNullList.add(index);
+ }
+ measurementValues.add(value);
+ index++;
+ batchData.next();
+ }
+
+ BitMap bitmap = new BitMap(measurementTimestamps.size());
+ for (int isNull : isNullList) {
+ bitmap.mark(isNull);
+ }
+ byte[] bytes = bitmap.getByteArray();
+ for (byte value : bytes) {
+ bitMapBytes.add(value);
+ }
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ long[] timestamps = new long[measurementTimestamps.size()];
+ for (int i = 0; i < measurementTimestamps.size(); i++) {
+ timestamps[i] = measurementTimestamps.get(i);
+ }
+
+ byte[] byteArray = new byte[bitMapBytes.size()];
+ for (int i = 0; i < bitMapBytes.size(); i++) {
+ byteArray[i] = bitMapBytes.get(i);
+ }
+ BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)};
+
+ return createTablet(timestamps, measurementValues.toArray(), bitMaps);
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
similarity index 55%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
copy to
server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
index bc56a8bb3cf..62979b7d52c 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
@@ -17,8 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+package org.apache.iotdb.db.pipe.core.processor;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -27,10 +29,11 @@ import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import java.io.IOException;
-public class DoNothingProcessor implements PipeProcessor {
+public class PipeDoNothingProcessor implements PipeProcessor {
@Override
public void validate(PipeParameterValidator validator) {
@@ -46,13 +49,46 @@ public class DoNothingProcessor implements PipeProcessor {
@Override
public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
throws IOException {
- eventCollector.collect(tabletInsertionEvent);
+ if (tabletInsertionEvent instanceof EnrichedEvent) {
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
+ if (enrichedEvent
+ .getPattern()
+ .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+ eventCollector.collect(tabletInsertionEvent);
+ } else {
+ eventCollector.collect(
+ tabletInsertionEvent.processRowByRow(
+ (row, rowCollector) -> {
+ try {
+ rowCollector.collectRow(row);
+ } catch (IOException e) {
+ throw new PipeException("Failed to collect row", e);
+ }
+ }));
+ }
+ } else {
+ eventCollector.collect(tabletInsertionEvent);
+ }
}
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
throws IOException {
- eventCollector.collect(tsFileInsertionEvent);
+ if (tsFileInsertionEvent instanceof EnrichedEvent) {
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
+ if (enrichedEvent
+ .getPattern()
+ .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+ eventCollector.collect(tsFileInsertionEvent);
+ } else {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ eventCollector.collect(tabletInsertionEvent);
+ }
+ }
+ } else {
+ eventCollector.collect(tsFileInsertionEvent);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 7af0df4ecff..b21a3a85c34 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
+import org.apache.iotdb.db.pipe.core.processor.PipeDoNothingProcessor;
import
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.queue.BlockingPendingQueue;
@@ -68,7 +71,14 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
this.pipeProcessorParameters = pipeProcessorParameters;
final String taskId = pipeName + "_" + dataRegionId;
- pipeProcessor =
PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+ pipeProcessor =
+ pipeProcessorParameters
+ .getStringOrDefault(
+ PipeProcessorConstant.PROCESSOR_KEY,
+ BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+ ? new PipeDoNothingProcessor()
+ : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(pipeConnectorOutputPendingQueue);
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 6deaed9343d..4bc74e6d929 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -131,12 +131,12 @@ public class CachedSchemaPatternMatcherTest {
for (int j = 0; j < deviceNum; j++) {
PipeRealtimeCollectEvent event =
new PipeRealtimeCollectEvent(
- null, null, Collections.singletonMap("root." + i,
measurements));
+ null, null, Collections.singletonMap("root." + i,
measurements), "root");
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(collector -> collector.collect(event));
totalTime += (System.currentTimeMillis() - startTime);
}
- PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null,
null, deviceMap);
+ PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null,
null, deviceMap, "root");
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(collector -> collector.collect(event));
totalTime += (System.currentTimeMillis() - startTime);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 24be2f73c6f..ce70aecb1da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -378,6 +378,7 @@ public class Tablet {
}
private void writeTimes(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(timestamps.length, stream);
for (long time : timestamps) {
ReadWriteIOUtils.write(time, stream);
}
@@ -385,14 +386,15 @@ public class Tablet {
/** Serialize bitmaps */
private void writeBitMaps(DataOutputStream stream) throws IOException {
- ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
+ ReadWriteIOUtils.write(bitMaps != null ? 1 : 0, stream);
if (bitMaps != null) {
for (BitMap bitMap : bitMaps) {
if (bitMap == null) {
- ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
+ ReadWriteIOUtils.write(0, stream);
} else {
- ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
- stream.write(bitMap.getByteArray());
+ ReadWriteIOUtils.write(1, stream);
+ ReadWriteIOUtils.write(bitMap.getSize(), stream);
+ ReadWriteIOUtils.write(new Binary(bitMap.getByteArray()), stream);
}
}
}
@@ -410,38 +412,48 @@ public class Tablet {
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
- for (int j = 0; j < rowSize; j++) {
+ ReadWriteIOUtils.write(intValues.length, stream);
+ for (int j = 0; j < intValues.length; j++) {
ReadWriteIOUtils.write(intValues[j], stream);
}
break;
case INT64:
long[] longValues = (long[]) column;
- for (int j = 0; j < rowSize; j++) {
+ ReadWriteIOUtils.write(longValues.length, stream);
+ for (int j = 0; j < longValues.length; j++) {
ReadWriteIOUtils.write(longValues[j], stream);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
- for (int j = 0; j < rowSize; j++) {
+ ReadWriteIOUtils.write(floatValues.length, stream);
+ for (int j = 0; j < floatValues.length; j++) {
ReadWriteIOUtils.write(floatValues[j], stream);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
- for (int j = 0; j < rowSize; j++) {
+ ReadWriteIOUtils.write(doubleValues.length, stream);
+ for (int j = 0; j < doubleValues.length; j++) {
ReadWriteIOUtils.write(doubleValues[j], stream);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
- for (int j = 0; j < rowSize; j++) {
- ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
+ ReadWriteIOUtils.write(boolValues.length, stream);
+ for (int j = 0; j < boolValues.length; j++) {
+ ReadWriteIOUtils.write(boolValues[j] ? 1 : 0, stream);
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
- for (int j = 0; j < rowSize; j++) {
- ReadWriteIOUtils.write(binaryValues[j], stream);
+ ReadWriteIOUtils.write(binaryValues.length, stream);
+ for (int j = 0; j < binaryValues.length; j++) {
+ boolean isNull = (binaryValues[j] == null);
+ ReadWriteIOUtils.write(isNull ? 1 : 0, stream);
+ if (!isNull) {
+ ReadWriteIOUtils.write(binaryValues[j], stream);
+ }
}
break;
default:
@@ -466,22 +478,23 @@ public class Tablet {
}
// deserialize times
- long[] times = new long[rowSize];
- for (int i = 0; i < rowSize; i++) {
+ int timesSize = ReadWriteIOUtils.readInt(byteBuffer);
+ long[] times = new long[timesSize];
+ for (int i = 0; i < timesSize; i++) {
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
}
// deserialize bitmaps
- boolean hasBitMaps = BytesUtils.byteToBool(byteBuffer.get());
+ boolean hasBitMaps = (ReadWriteIOUtils.readInt(byteBuffer) == 1);
BitMap[] bitMaps = null;
if (hasBitMaps) {
- bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize, rowSize);
+ bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize);
}
// deserialize values
TSDataType[] dataTypes =
schemas.stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new);
- Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes,
schemaSize, rowSize);
+ Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes,
schemaSize);
Tablet tablet = new Tablet(deviceId, schemas, times, values, bitMaps,
rowSize);
tablet.constructMeasurementIndexMap();
@@ -489,19 +502,14 @@ public class Tablet {
}
/** deserialize bitmaps */
- public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns,
int size) {
- if (!buffer.hasRemaining()) {
- return null;
- }
+ public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns)
{
BitMap[] bitMaps = new BitMap[columns];
for (int i = 0; i < columns; i++) {
- boolean hasBitMap = BytesUtils.byteToBool(buffer.get());
+ boolean hasBitMap = (ReadWriteIOUtils.readInt(buffer) == 1);
if (hasBitMap) {
- byte[] bytes = new byte[size / Byte.SIZE + 1];
- for (int j = 0; j < bytes.length; j++) {
- bytes[j] = buffer.get();
- }
- bitMaps[i] = new BitMap(size, bytes);
+ int bitMapSize = ReadWriteIOUtils.readInt(buffer);
+ byte[] bytes = ReadWriteIOUtils.readBinary(buffer).getValues();
+ bitMaps[i] = new BitMap(bitMapSize, bytes);
}
}
return bitMaps;
@@ -510,56 +518,56 @@ public class Tablet {
/**
* @param buffer data values
* @param columns column number
- * @param size value count in each column
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public static Object[] readTabletValuesFromBuffer(
- ByteBuffer buffer, TSDataType[] types, int columns, int size) {
+ ByteBuffer buffer, TSDataType[] types, int columns) {
Object[] values = new Object[columns];
for (int i = 0; i < columns; i++) {
+ int arraySize = ReadWriteIOUtils.readInt(buffer);
switch (types[i]) {
case BOOLEAN:
- boolean[] boolValues = new boolean[size];
- for (int index = 0; index < size; index++) {
- boolValues[index] = BytesUtils.byteToBool(buffer.get());
+ boolean[] boolValues = new boolean[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ boolValues[index] = ReadWriteIOUtils.readInt(buffer) == 1;
}
values[i] = boolValues;
break;
case INT32:
- int[] intValues = new int[size];
- for (int index = 0; index < size; index++) {
- intValues[index] = buffer.getInt();
+ int[] intValues = new int[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ intValues[index] = ReadWriteIOUtils.readInt(buffer);
}
values[i] = intValues;
break;
case INT64:
- long[] longValues = new long[size];
- for (int index = 0; index < size; index++) {
- longValues[index] = buffer.getLong();
+ long[] longValues = new long[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ longValues[index] = ReadWriteIOUtils.readLong(buffer);
}
values[i] = longValues;
break;
case FLOAT:
- float[] floatValues = new float[size];
- for (int index = 0; index < size; index++) {
- floatValues[index] = buffer.getFloat();
+ float[] floatValues = new float[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ floatValues[index] = ReadWriteIOUtils.readFloat(buffer);
}
values[i] = floatValues;
break;
case DOUBLE:
- double[] doubleValues = new double[size];
- for (int index = 0; index < size; index++) {
- doubleValues[index] = buffer.getDouble();
+ double[] doubleValues = new double[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ doubleValues[index] = ReadWriteIOUtils.readDouble(buffer);
}
values[i] = doubleValues;
break;
case TEXT:
- Binary[] binaryValues = new Binary[size];
- for (int index = 0; index < size; index++) {
- int binarySize = buffer.getInt();
- byte[] binaryValue = new byte[binarySize];
- buffer.get(binaryValue);
- binaryValues[index] = new Binary(binaryValue);
+ Binary[] binaryValues = new Binary[arraySize];
+ for (int index = 0; index < arraySize; index++) {
+ boolean isNull = (ReadWriteIOUtils.readInt(buffer) == 1);
+ if (!isNull) {
+ binaryValues[index] = ReadWriteIOUtils.readBinary(buffer);
+ }
}
values[i] = binaryValues;
break;
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
index f37712197a8..34ad5a5a7d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.write.record;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
@@ -29,7 +30,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TabletTest {
@@ -52,11 +53,18 @@ public class TabletTest {
((long[]) values[1])[i] = 1;
}
- Tablet tablet = new Tablet(deviceId, measurementSchemas, timestamps,
values, null, rowSize);
+ Tablet tablet =
+ new Tablet(
+ deviceId,
+ measurementSchemas,
+ timestamps,
+ values,
+ new BitMap[] {new BitMap(1024), new BitMap(1024)},
+ rowSize);
try {
ByteBuffer byteBuffer = tablet.serialize();
Tablet newTablet = Tablet.deserialize(byteBuffer);
- assertEquals(newTablet, tablet);
+ assertTrue(newTablet.equals(tablet));
} catch (Exception e) {
e.printStackTrace();
fail();