This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b87c6463ff1 Pipe Subscription: initialize the subscription client RPC
payload (#12121)
b87c6463ff1 is described below
commit b87c6463ff1e16823b07197375e75b75a6634c1b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Mar 6 23:45:05 2024 +0800
Pipe Subscription: initialize the subscription client RPC payload (#12121)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 11 ++
.../iotdb/rpc/subscription/EnrichedRowRecord.java | 34 ++++++
.../rpc/subscription/IoTDBSubscriptionDataSet.java | 35 ++++++
.../payload/request/ConsumerConfig.java | 85 ++++++++++++++
.../payload/request/PipeSubscribeCloseReq.java | 74 ++++++++++++
.../payload/request/PipeSubscribeCommitReq.java | 110 ++++++++++++++++++
.../payload/request/PipeSubscribeHandshakeReq.java | 97 ++++++++++++++++
.../payload/request/PipeSubscribeHeartbeatReq.java | 75 +++++++++++++
.../payload/request/PipeSubscribePollReq.java | 96 ++++++++++++++++
.../payload/request/PipeSubscribeRequestType.java | 60 ++++++++++
.../request/PipeSubscribeRequestVersion.java | 35 ++++++
.../payload/request/PipeSubscribeSubscribeReq.java | 100 +++++++++++++++++
.../request/PipeSubscribeUnsubscribeReq.java | 101 +++++++++++++++++
.../payload/response/EnrichedTablets.java | 89 +++++++++++++++
.../payload/response/PipeSubscribeCloseResp.java | 78 +++++++++++++
.../payload/response/PipeSubscribeCommitResp.java | 78 +++++++++++++
.../response/PipeSubscribeHandshakeResp.java | 125 +++++++++++++++++++++
.../response/PipeSubscribeHeartbeatResp.java | 79 +++++++++++++
.../payload/response/PipeSubscribePollResp.java | 110 ++++++++++++++++++
.../response/PipeSubscribeResponseType.java | 55 +++++++++
.../response/PipeSubscribeResponseVersion.java | 35 ++++++
.../response/PipeSubscribeSubscribeResp.java | 79 +++++++++++++
.../response/PipeSubscribeUnsubscribeResp.java | 79 +++++++++++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 8 ++
.../thrift-datanode/src/main/thrift/client.thrift | 15 +++
25 files changed, 1743 insertions(+)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b8407bb86de..19a2a282764 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -221,6 +221,17 @@ public enum TSStatusCode {
PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR(1805),
PIPE_NOT_EXIST_ERROR(1806),
PIPE_PUSH_META_ERROR(1807),
+
+ // Subscription
+ SUBSCRIPTION_VERSION_ERROR(1900),
+ SUBSCRIPTION_TYPE_ERROR(1901),
+ SUBSCRIPTION_HANDSHAKE_ERROR(1902),
+ SUBSCRIPTION_HEARTBEAT_ERROR(1903),
+ SUBSCRIPTION_POLL_ERROR(1904),
+ SUBSCRIPTION_COMMIT_ERROR(1905),
+ SUBSCRIPTION_CLOSE_ERROR(1906),
+ SUBSCRIPTION_SUBSCRIBE_ERROR(1907),
+ SUBSCRIPTION_UNSUBSCRIBE_ERROR(1908),
;
private final int statusCode;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
new file mode 100644
index 00000000000..26ad041934a
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.List;
+
+public class EnrichedRowRecord {
+
+ private String topicName;
+ private RowRecord record;
+ private List<String> columnNameList;
+ private List<String> columnTypeList;
+
+ // TODO: translate EnrichedTablets to EnrichedRowRecord
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
new file mode 100644
index 00000000000..894c15e0e48
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription;
+
+import java.util.Iterator;
+
+public class IoTDBSubscriptionDataSet implements Iterator<EnrichedRowRecord> {
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public EnrichedRowRecord next() {
+ return null;
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
new file mode 100644
index 00000000000..a312182871c
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ConsumerConfig {
+
+ private transient String consumerGroupID;
+ private transient String consumerClientID;
+
+ // TODO: more configs
+
+ public String getConsumerClientID() {
+ return consumerClientID;
+ }
+
+ public String getConsumerGroupID() {
+ return consumerGroupID;
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(consumerGroupID, stream);
+ ReadWriteIOUtils.write(consumerClientID, stream);
+ }
+
+ public static ConsumerConfig deserialize(ByteBuffer buffer) {
+ final ConsumerConfig consumerConfig = new ConsumerConfig();
+ consumerConfig.consumerGroupID = ReadWriteIOUtils.readString(buffer);
+ consumerConfig.consumerClientID = ReadWriteIOUtils.readString(buffer);
+ return consumerConfig;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ConsumerConfig that = (ConsumerConfig) obj;
+ return Objects.equals(this.consumerGroupID, that.consumerGroupID)
+ && Objects.equals(this.consumerClientID, that.consumerClientID);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(consumerGroupID, consumerClientID);
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerConfig{"
+ + "consumerGroupID='"
+ + consumerGroupID
+ + "', consumerClientID="
+ + consumerClientID
+ + "}";
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
new file mode 100644
index 00000000000..724f0248943
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+
+import java.util.Objects;
+
+public class PipeSubscribeCloseReq extends TPipeSubscribeReq {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeCloseReq`, called by
the subscription
+ * client.
+ */
+ public static PipeSubscribeCloseReq toTPipeSubscribeReq() {
+ final PipeSubscribeCloseReq req = new PipeSubscribeCloseReq();
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.CLOSE.getType();
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeCloseReq fromTPipeSubscribeReq(TPipeSubscribeReq
closeReq) {
+ final PipeSubscribeCloseReq req = new PipeSubscribeCloseReq();
+
+ req.version = closeReq.version;
+ req.type = closeReq.type;
+ req.body = closeReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeCloseReq that = (PipeSubscribeCloseReq) obj;
+ return this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
new file mode 100644
index 00000000000..f453b00d8f0
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeCommitReq extends TPipeSubscribeReq {
+
+ private transient List<Pair<String, Integer>> committerKeyAndCommitIds = new
ArrayList<>();
+
+ public List<Pair<String, Integer>> getCommitterKeyAndCommitIds() {
+ return committerKeyAndCommitIds;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeCommitReq`, called
by the subscription
+ * client.
+ */
+ public static PipeSubscribeCommitReq toTPipeSubscribeReq(
+ List<Pair<String, Integer>> committerKeyAndCommitIds) throws IOException
{
+ final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq();
+
+ req.committerKeyAndCommitIds = committerKeyAndCommitIds;
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.COMMIT.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(committerKeyAndCommitIds.size(), outputStream);
+ for (Pair<String, Integer> committerKeyAndCommitId :
committerKeyAndCommitIds) {
+ ReadWriteIOUtils.write(committerKeyAndCommitId.left, outputStream);
+ ReadWriteIOUtils.write(committerKeyAndCommitId.right, outputStream);
+ }
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeCommitReq fromTPipeSubscribeReq(TPipeSubscribeReq
commitReq) {
+ final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq();
+
+ if (commitReq.body.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(commitReq.body);
+ for (int i = 0; i < size; ++i) {
+ String committerKey = ReadWriteIOUtils.readString(commitReq.body);
+ int commitId = ReadWriteIOUtils.readInt(commitReq.body);
+ req.committerKeyAndCommitIds.add(new Pair<>(committerKey, commitId));
+ }
+ }
+
+ req.version = commitReq.version;
+ req.type = commitReq.type;
+ req.body = commitReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeCommitReq that = (PipeSubscribeCommitReq) obj;
+ return Objects.equals(this.committerKeyAndCommitIds,
that.committerKeyAndCommitIds)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(committerKeyAndCommitIds, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
new file mode 100644
index 00000000000..595a417111f
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeSubscribeHandshakeReq extends TPipeSubscribeReq {
+
+ private transient ConsumerConfig consumerConfig = new ConsumerConfig();
+
+ public ConsumerConfig getConsumerConfig() {
+ return consumerConfig;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeHandshakeReq`,
called by the subscription
+ * client.
+ */
+ public static PipeSubscribeHandshakeReq toTPipeSubscribeReq(ConsumerConfig
consumerConfig)
+ throws IOException {
+ final PipeSubscribeHandshakeReq req = new PipeSubscribeHandshakeReq();
+
+ req.consumerConfig = consumerConfig;
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.HANDSHAKE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ consumerConfig.serialize(outputStream);
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeHandshakeReq
fromTPipeSubscribeReq(TPipeSubscribeReq handshakeReq) {
+ final PipeSubscribeHandshakeReq req = new PipeSubscribeHandshakeReq();
+
+ if (handshakeReq.body.hasRemaining()) {
+ req.consumerConfig = ConsumerConfig.deserialize(handshakeReq.body);
+ }
+
+ req.version = handshakeReq.version;
+ req.type = handshakeReq.type;
+ req.body = handshakeReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeHandshakeReq that = (PipeSubscribeHandshakeReq) obj;
+ return Objects.equals(this.consumerConfig, that.consumerConfig)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(consumerConfig, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
new file mode 100644
index 00000000000..e3ab8bbca6b
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class PipeSubscribeHeartbeatReq extends TPipeSubscribeReq {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeHeartbeatReq`,
called by the subscription
+ * client.
+ */
+ public static PipeSubscribeHeartbeatReq toTPipeSubscribeReq() throws
IOException {
+ final PipeSubscribeHeartbeatReq req = new PipeSubscribeHeartbeatReq();
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.HEARTBEAT.getType();
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeHeartbeatReq
fromTPipeSubscribeReq(TPipeSubscribeReq heartbeatReq) {
+ final PipeSubscribeHeartbeatReq req = new PipeSubscribeHeartbeatReq();
+
+ req.version = heartbeatReq.version;
+ req.type = heartbeatReq.type;
+ req.body = heartbeatReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeHeartbeatReq that = (PipeSubscribeHeartbeatReq) obj;
+ return this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
new file mode 100644
index 00000000000..4d79dd1e912
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribePollReq extends TPipeSubscribeReq {
+
+ private transient List<String> topicNames = new ArrayList<>();;
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribePollReq`, called by
the subscription
+ * client.
+ */
+ public static PipeSubscribePollReq toTPipeSubscribeReq(List<String>
topicNames)
+ throws IOException {
+ final PipeSubscribePollReq req = new PipeSubscribePollReq();
+
+ req.topicNames = topicNames;
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.POLL.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribePollReq fromTPipeSubscribeReq(TPipeSubscribeReq
pollReq) {
+ final PipeSubscribePollReq req = new PipeSubscribePollReq();
+
+ if (pollReq.body.hasRemaining()) {
+ req.topicNames = ReadWriteIOUtils.readStringList(pollReq.body);
+ }
+
+ req.version = pollReq.version;
+ req.type = pollReq.type;
+ req.body = pollReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribePollReq that = (PipeSubscribePollReq) obj;
+ return Objects.equals(this.topicNames, that.topicNames)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicNames, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
new file mode 100644
index 00000000000..f9827e8f706
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeSubscribeRequestType {
+ HANDSHAKE((short) 0),
+ HEARTBEAT((short) 1),
+ POLL((short) 2),
+ COMMIT((short) 3),
+ CLOSE((short) 4),
+ SUBSCRIBE((short) 5),
+ UNSUBSCRIBE((short) 6),
+ ;
+
+ private final short type;
+
+ PipeSubscribeRequestType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ private static final Map<Short, PipeSubscribeRequestType> TYPE_MAP =
+ Arrays.stream(PipeSubscribeRequestType.values())
+ .collect(
+ HashMap::new,
+ (typeMap, pipeRequestType) ->
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+ HashMap::putAll);
+
+ public static boolean isValidatedRequestType(short type) {
+ return TYPE_MAP.containsKey(type);
+ }
+
+ public static PipeSubscribeRequestType valueOf(short type) {
+ return TYPE_MAP.get(type);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
new file mode 100644
index 00000000000..8eb58db04a1
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+public enum PipeSubscribeRequestVersion {
+ VERSION_1((byte) 1),
+ ;
+
+ private final byte version;
+
+ PipeSubscribeRequestVersion(byte type) {
+ this.version = type;
+ }
+
+ public byte getVersion() {
+ return version;
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
new file mode 100644
index 00000000000..c9a502ce6d0
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeSubscribeReq extends TPipeSubscribeReq {
+
+ private transient List<String> topicNames = new ArrayList<>();
+
+ public List<String> getTopicNames() {
+ return topicNames;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeSubscribeReq`,
called by the subscription
+ * client.
+ */
+ public static PipeSubscribeSubscribeReq toTPipeSubscribeReq(List<String>
topicNames)
+ throws IOException {
+ final PipeSubscribeSubscribeReq req = new PipeSubscribeSubscribeReq();
+
+ req.topicNames = topicNames;
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.SUBSCRIBE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeSubscribeReq
fromTPipeSubscribeReq(TPipeSubscribeReq subscribeReq) {
+ final PipeSubscribeSubscribeReq req = new PipeSubscribeSubscribeReq();
+
+ if (subscribeReq.body.hasRemaining()) {
+ req.topicNames = ReadWriteIOUtils.readStringList(subscribeReq.body);
+ }
+
+ req.version = subscribeReq.version;
+ req.type = subscribeReq.type;
+ req.body = subscribeReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeSubscribeReq that = (PipeSubscribeSubscribeReq) obj;
+ return Objects.equals(this.topicNames, that.topicNames)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicNames, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
new file mode 100644
index 00000000000..f1b04c063ed
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeUnsubscribeReq extends TPipeSubscribeReq {
+
+ private transient List<String> topicNames = new ArrayList<>();
+
+ public List<String> getTopicNames() {
+ return topicNames;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeUnsubscribeReq`,
called by the
+ * subscription client.
+ */
+ public static PipeSubscribeUnsubscribeReq toTPipeSubscribeReq(List<String>
topicNames)
+ throws IOException {
+ final PipeSubscribeUnsubscribeReq req = new PipeSubscribeUnsubscribeReq();
+
+ req.topicNames = topicNames;
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.UNSUBSCRIBE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the
subscription server. */
+ public static PipeSubscribeUnsubscribeReq fromTPipeSubscribeReq(
+ TPipeSubscribeReq unsubscribeReq) {
+ final PipeSubscribeUnsubscribeReq req = new PipeSubscribeUnsubscribeReq();
+
+ if (unsubscribeReq.body.hasRemaining()) {
+ req.topicNames = ReadWriteIOUtils.readStringList(unsubscribeReq.body);
+ }
+
+ req.version = unsubscribeReq.version;
+ req.type = unsubscribeReq.type;
+ req.body = unsubscribeReq.body;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeUnsubscribeReq that = (PipeSubscribeUnsubscribeReq) obj;
+ return Objects.equals(this.topicNames, that.topicNames)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicNames, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
new file mode 100644
index 00000000000..17034327cab
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class EnrichedTablets {
+
+ private transient String topicName;
+ private transient List<Tablet> tablets = new ArrayList<>();
+ private transient List<Pair<String, Integer>> committerKeyAndCommitIds = new
ArrayList<>();
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(topicName, stream);
+ ReadWriteIOUtils.write(tablets.size(), stream);
+ for (Tablet tablet : tablets) {
+ tablet.serialize(stream);
+ }
+ ReadWriteIOUtils.write(committerKeyAndCommitIds.size(), stream);
+ for (Pair<String, Integer> committerKeyAndCommitId :
committerKeyAndCommitIds) {
+ ReadWriteIOUtils.write(committerKeyAndCommitId.left, stream);
+ ReadWriteIOUtils.write(committerKeyAndCommitId.right, stream);
+ }
+ }
+
+ public static EnrichedTablets deserialize(ByteBuffer buffer) {
+ final EnrichedTablets enrichedTablets = new EnrichedTablets();
+ enrichedTablets.topicName = ReadWriteIOUtils.readString(buffer);
+ int size = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < size; ++i) {
+ enrichedTablets.tablets.add(Tablet.deserialize(buffer));
+ }
+ size = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < size; ++i) {
+ String committerKey = ReadWriteIOUtils.readString(buffer);
+ int commitId = ReadWriteIOUtils.readInt(buffer);
+ enrichedTablets.committerKeyAndCommitIds.add(new Pair<>(committerKey,
commitId));
+ }
+ return enrichedTablets;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ EnrichedTablets that = (EnrichedTablets) obj;
+ return Objects.equals(this.topicName, that.topicName)
+ && Objects.equals(this.tablets, that.tablets)
+ && Objects.equals(this.committerKeyAndCommitIds,
that.committerKeyAndCommitIds);
+ }
+
+ @Override
+ public int hashCode() {
+ // TODO: Tablet hashCode
+ return Objects.hash(topicName, tablets, committerKeyAndCommitIds);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
new file mode 100644
index 00000000000..1fdbcbc9a69
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeCloseResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeCloseResp`, called
by the subscription
+ * server.
+ */
+ public static PipeSubscribeCloseResp toTPipeSubscribeResp(TSStatus status) {
+ final PipeSubscribeCloseResp resp = new PipeSubscribeCloseResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeCloseResp
fromTPipeSubscribeResp(TPipeSubscribeResp closeResp) {
+ final PipeSubscribeCloseResp resp = new PipeSubscribeCloseResp();
+
+ resp.status = closeResp.status;
+ resp.version = closeResp.version;
+ resp.type = closeResp.type;
+ resp.body = closeResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeCloseResp that = (PipeSubscribeCloseResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
new file mode 100644
index 00000000000..59407433370
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeCommitResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeCommitResp`, called
by the subscription
+ * server.
+ */
+ public static PipeSubscribeCommitResp toTPipeSubscribeResp(TSStatus status) {
+ final PipeSubscribeCommitResp resp = new PipeSubscribeCommitResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeCommitResp
fromTPipeSubscribeResp(TPipeSubscribeResp commitResp) {
+ final PipeSubscribeCommitResp resp = new PipeSubscribeCommitResp();
+
+ resp.status = commitResp.status;
+ resp.version = commitResp.version;
+ resp.type = commitResp.type;
+ resp.body = commitResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeCommitResp that = (PipeSubscribeCommitResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
new file mode 100644
index 00000000000..9a4e55aeb13
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeSubscribeHandshakeResp extends TPipeSubscribeResp {
+
+ // dataNodeId -> clientRpcEndPoint
+ private transient Map<Integer, TEndPoint> endPoints = new HashMap<>();
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeHandshakeResp`,
called by the subscription
+ * server.
+ */
+ public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(
+ TSStatus status, Map<Integer, TEndPoint> endPoints) {
+ final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
+
+ resp.endPoints = endPoints;
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(endPoints.size(), outputStream);
+ for (Map.Entry<Integer, TEndPoint> endPoint : endPoints.entrySet()) {
+ ReadWriteIOUtils.write(endPoint.getKey(), outputStream);
+ ReadWriteIOUtils.write(endPoint.getValue().ip, outputStream);
+ ReadWriteIOUtils.write(endPoint.getValue().port, outputStream);
+ }
+ resp.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ } catch (IOException e) {
+ resp.status =
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, e.getMessage());
+ return resp;
+ }
+
+ return resp;
+ }
+
+ public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus
status) {
+ return toTPipeSubscribeResp(status, Collections.emptyMap());
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeHandshakeResp fromTPipeSubscribeResp(
+ TPipeSubscribeResp handshakeResp) {
+ final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
+
+ if (handshakeResp.body.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(handshakeResp.body);
+ for (int i = 0; i < size; ++i) {
+ final int id = ReadWriteIOUtils.readInt(handshakeResp.body);
+ final String ip = ReadWriteIOUtils.readString(handshakeResp.body);
+ final int port = ReadWriteIOUtils.readInt(handshakeResp.body);
+ resp.endPoints.put(id, new TEndPoint(ip, port));
+ }
+ }
+
+ resp.status = handshakeResp.status;
+ resp.version = handshakeResp.version;
+ resp.type = handshakeResp.type;
+ resp.body = handshakeResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeHandshakeResp that = (PipeSubscribeHandshakeResp) obj;
+ return Objects.equals(this.endPoints, that.endPoints)
+ && Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(endPoints, status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
new file mode 100644
index 00000000000..42836a3a5a8
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeHeartbeatResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeHeartbeatResp`,
called by the subscription
+ * server.
+ */
+ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(TSStatus
status) {
+ final PipeSubscribeHeartbeatResp resp = new PipeSubscribeHeartbeatResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeHeartbeatResp fromTPipeSubscribeResp(
+ TPipeSubscribeResp heartbeatResp) {
+ final PipeSubscribeHeartbeatResp resp = new PipeSubscribeHeartbeatResp();
+
+ resp.status = heartbeatResp.status;
+ resp.version = heartbeatResp.version;
+ resp.type = heartbeatResp.type;
+ resp.body = heartbeatResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
new file mode 100644
index 00000000000..6e4b2d52a0d
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribePollResp extends TPipeSubscribeResp {
+
+ private transient List<EnrichedTablets> enrichedTabletsList = new
ArrayList<>();
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribePollResp`, called by
the subscription
+ * server.
+ */
+ public static PipeSubscribePollResp toTPipeSubscribeResp(
+ TSStatus status, List<EnrichedTablets> enrichedTabletsList) {
+ final PipeSubscribePollResp resp = new PipeSubscribePollResp();
+
+ resp.enrichedTabletsList = enrichedTabletsList;
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(enrichedTabletsList.size(), outputStream);
+ for (EnrichedTablets enrichedTablets : enrichedTabletsList) {
+ enrichedTablets.serialize(outputStream);
+ }
+ resp.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ } catch (IOException e) {
+ resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR,
e.getMessage());
+ }
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribePollResp
fromTPipeSubscribeResp(TPipeSubscribeResp pollResp) {
+ final PipeSubscribePollResp resp = new PipeSubscribePollResp();
+
+ if (pollResp.body.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(pollResp.body);
+ for (int i = 0; i < size; ++i) {
+
resp.enrichedTabletsList.add(EnrichedTablets.deserialize(pollResp.body));
+ }
+ }
+
+ resp.status = pollResp.status;
+ resp.version = pollResp.version;
+ resp.type = pollResp.type;
+ resp.body = pollResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribePollResp that = (PipeSubscribePollResp) obj;
+ return Objects.equals(this.enrichedTabletsList, that.enrichedTabletsList)
+ && Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(enrichedTabletsList, status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
new file mode 100644
index 00000000000..aff0ed99e78
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeSubscribeResponseType {
+ ACK((short) 0),
+ POLL_TABLETS((short) 1),
+ ;
+
+ private final short type;
+
+ PipeSubscribeResponseType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ private static final Map<Short, PipeSubscribeResponseType> TYPE_MAP =
+ Arrays.stream(PipeSubscribeResponseType.values())
+ .collect(
+ HashMap::new,
+ (typeMap, pipeRequestType) ->
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+ HashMap::putAll);
+
+ public static boolean isValidatedRequestType(short type) {
+ return TYPE_MAP.containsKey(type);
+ }
+
+ public static PipeSubscribeResponseType valueOf(short type) {
+ return TYPE_MAP.get(type);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
new file mode 100644
index 00000000000..d2eeacd6e34
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+public enum PipeSubscribeResponseVersion {
+ VERSION_1((byte) 1),
+ ;
+
+ private final byte version;
+
+ PipeSubscribeResponseVersion(byte type) {
+ this.version = type;
+ }
+
+ public byte getVersion() {
+ return version;
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
new file mode 100644
index 00000000000..64337741907
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeSubscribeResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeSubscribeResp`,
called by the subscription
+ * server.
+ */
+ public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(TSStatus
status) {
+ final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeSubscribeResp fromTPipeSubscribeResp(
+ TPipeSubscribeResp subscribeResp) {
+ final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+
+ resp.status = subscribeResp.status;
+ resp.version = subscribeResp.version;
+ resp.type = subscribeResp.type;
+ resp.body = subscribeResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeSubscribeResp that = (PipeSubscribeSubscribeResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
new file mode 100644
index 00000000000..6594a1a7919
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeUnsubscribeResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeUnsubscribeResp`,
called by the
+ * subscription server.
+ */
+ public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(TSStatus
status) {
+ final PipeSubscribeUnsubscribeResp resp = new
PipeSubscribeUnsubscribeResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
+ public static PipeSubscribeUnsubscribeResp fromTPipeSubscribeResp(
+ TPipeSubscribeResp unsubscribeResp) {
+ final PipeSubscribeUnsubscribeResp resp = new
PipeSubscribeUnsubscribeResp();
+
+ resp.status = unsubscribeResp.status;
+ resp.version = unsubscribeResp.version;
+ resp.type = unsubscribeResp.type;
+ resp.body = unsubscribeResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeSubscribeUnsubscribeResp that = (PipeSubscribeUnsubscribeResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index d386f45e167..03b28a97c6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -114,6 +114,8 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import
org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
@@ -2565,6 +2567,12 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return PipeAgent.receiver().thrift().receive(req, partitionFetcher,
schemaFetcher);
}
+ @Override
+ public TPipeSubscribeResp pipeSubscribe(TPipeSubscribeReq req) {
+ // TODO: SubscriptionAgent
+ return null;
+ }
+
@Override
public TSBackupConfigurationResp getBackupConfiguration() {
return new
TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 2b2f5585726..d96a04d7b21 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -490,6 +490,19 @@ struct TPipeTransferResp {
2:optional binary body
}
+struct TPipeSubscribeReq {
+ 1:required i8 version
+ 2:required i16 type
+ 3:optional binary body
+}
+
+struct TPipeSubscribeResp {
+ 1:required common.TSStatus status
+ 2:required i8 version
+ 3:required i16 type
+ 4:optional binary body
+}
+
struct TSBackupConfigurationResp {
1: required common.TSStatus status
2: optional bool enableOperationSync
@@ -637,6 +650,8 @@ service IClientRPCService {
TPipeTransferResp pipeTransfer(TPipeTransferReq req);
+ TPipeSubscribeResp pipeSubscribe(TPipeSubscribeReq req);
+
TSBackupConfigurationResp getBackupConfiguration();
TSConnectionInfoResp fetchAllConnectionsInfo();