This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b87c6463ff1 Pipe Subscription: initialize the subscription client RPC 
payload (#12121)
b87c6463ff1 is described below

commit b87c6463ff1e16823b07197375e75b75a6634c1b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Mar 6 23:45:05 2024 +0800

    Pipe Subscription: initialize the subscription client RPC payload (#12121)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  11 ++
 .../iotdb/rpc/subscription/EnrichedRowRecord.java  |  34 ++++++
 .../rpc/subscription/IoTDBSubscriptionDataSet.java |  35 ++++++
 .../payload/request/ConsumerConfig.java            |  85 ++++++++++++++
 .../payload/request/PipeSubscribeCloseReq.java     |  74 ++++++++++++
 .../payload/request/PipeSubscribeCommitReq.java    | 110 ++++++++++++++++++
 .../payload/request/PipeSubscribeHandshakeReq.java |  97 ++++++++++++++++
 .../payload/request/PipeSubscribeHeartbeatReq.java |  75 +++++++++++++
 .../payload/request/PipeSubscribePollReq.java      |  96 ++++++++++++++++
 .../payload/request/PipeSubscribeRequestType.java  |  60 ++++++++++
 .../request/PipeSubscribeRequestVersion.java       |  35 ++++++
 .../payload/request/PipeSubscribeSubscribeReq.java | 100 +++++++++++++++++
 .../request/PipeSubscribeUnsubscribeReq.java       | 101 +++++++++++++++++
 .../payload/response/EnrichedTablets.java          |  89 +++++++++++++++
 .../payload/response/PipeSubscribeCloseResp.java   |  78 +++++++++++++
 .../payload/response/PipeSubscribeCommitResp.java  |  78 +++++++++++++
 .../response/PipeSubscribeHandshakeResp.java       | 125 +++++++++++++++++++++
 .../response/PipeSubscribeHeartbeatResp.java       |  79 +++++++++++++
 .../payload/response/PipeSubscribePollResp.java    | 110 ++++++++++++++++++
 .../response/PipeSubscribeResponseType.java        |  55 +++++++++
 .../response/PipeSubscribeResponseVersion.java     |  35 ++++++
 .../response/PipeSubscribeSubscribeResp.java       |  79 +++++++++++++
 .../response/PipeSubscribeUnsubscribeResp.java     |  79 +++++++++++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   8 ++
 .../thrift-datanode/src/main/thrift/client.thrift  |  15 +++
 25 files changed, 1743 insertions(+)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b8407bb86de..19a2a282764 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -221,6 +221,17 @@ public enum TSStatusCode {
   PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR(1805),
   PIPE_NOT_EXIST_ERROR(1806),
   PIPE_PUSH_META_ERROR(1807),
+
+  // Subscription
+  SUBSCRIPTION_VERSION_ERROR(1900),
+  SUBSCRIPTION_TYPE_ERROR(1901),
+  SUBSCRIPTION_HANDSHAKE_ERROR(1902),
+  SUBSCRIPTION_HEARTBEAT_ERROR(1903),
+  SUBSCRIPTION_POLL_ERROR(1904),
+  SUBSCRIPTION_COMMIT_ERROR(1905),
+  SUBSCRIPTION_CLOSE_ERROR(1906),
+  SUBSCRIPTION_SUBSCRIBE_ERROR(1907),
+  SUBSCRIPTION_UNSUBSCRIBE_ERROR(1908),
   ;
 
   private final int statusCode;
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
new file mode 100644
index 00000000000..26ad041934a
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/EnrichedRowRecord.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.List;
+
+public class EnrichedRowRecord {
+
+  private String topicName;
+  private RowRecord record;
+  private List<String> columnNameList;
+  private List<String> columnTypeList;
+
+  // TODO: translate EnrichedTablets to EnrichedRowRecord
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
new file mode 100644
index 00000000000..894c15e0e48
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/IoTDBSubscriptionDataSet.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription;
+
+import java.util.Iterator;
+
+public class IoTDBSubscriptionDataSet implements Iterator<EnrichedRowRecord> {
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public EnrichedRowRecord next() {
+    return null;
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
new file mode 100644
index 00000000000..a312182871c
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/ConsumerConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ConsumerConfig {
+
+  private transient String consumerGroupID;
+  private transient String consumerClientID;
+
+  // TODO: more configs
+
+  public String getConsumerClientID() {
+    return consumerClientID;
+  }
+
+  public String getConsumerGroupID() {
+    return consumerGroupID;
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(consumerGroupID, stream);
+    ReadWriteIOUtils.write(consumerClientID, stream);
+  }
+
+  public static ConsumerConfig deserialize(ByteBuffer buffer) {
+    final ConsumerConfig consumerConfig = new ConsumerConfig();
+    consumerConfig.consumerGroupID = ReadWriteIOUtils.readString(buffer);
+    consumerConfig.consumerClientID = ReadWriteIOUtils.readString(buffer);
+    return consumerConfig;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ConsumerConfig that = (ConsumerConfig) obj;
+    return Objects.equals(this.consumerGroupID, that.consumerGroupID)
+        && Objects.equals(this.consumerClientID, that.consumerClientID);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(consumerGroupID, consumerClientID);
+  }
+
+  @Override
+  public String toString() {
+    return "ConsumerConfig{"
+        + "consumerGroupID='"
+        + consumerGroupID
+        + "', consumerClientID="
+        + consumerClientID
+        + "}";
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
new file mode 100644
index 00000000000..724f0248943
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCloseReq.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+
+import java.util.Objects;
+
+public class PipeSubscribeCloseReq extends TPipeSubscribeReq {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeCloseReq`, called by 
the subscription
+   * client.
+   */
+  public static PipeSubscribeCloseReq toTPipeSubscribeReq() {
+    final PipeSubscribeCloseReq req = new PipeSubscribeCloseReq();
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.CLOSE.getType();
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeCloseReq fromTPipeSubscribeReq(TPipeSubscribeReq 
closeReq) {
+    final PipeSubscribeCloseReq req = new PipeSubscribeCloseReq();
+
+    req.version = closeReq.version;
+    req.type = closeReq.type;
+    req.body = closeReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeCloseReq that = (PipeSubscribeCloseReq) obj;
+    return this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
new file mode 100644
index 00000000000..f453b00d8f0
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeCommitReq extends TPipeSubscribeReq {
+
+  private transient List<Pair<String, Integer>> committerKeyAndCommitIds = new 
ArrayList<>();
+
+  public List<Pair<String, Integer>> getCommitterKeyAndCommitIds() {
+    return committerKeyAndCommitIds;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeCommitReq`, called 
by the subscription
+   * client.
+   */
+  public static PipeSubscribeCommitReq toTPipeSubscribeReq(
+      List<Pair<String, Integer>> committerKeyAndCommitIds) throws IOException 
{
+    final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq();
+
+    req.committerKeyAndCommitIds = committerKeyAndCommitIds;
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.COMMIT.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(committerKeyAndCommitIds.size(), outputStream);
+      for (Pair<String, Integer> committerKeyAndCommitId : 
committerKeyAndCommitIds) {
+        ReadWriteIOUtils.write(committerKeyAndCommitId.left, outputStream);
+        ReadWriteIOUtils.write(committerKeyAndCommitId.right, outputStream);
+      }
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeCommitReq fromTPipeSubscribeReq(TPipeSubscribeReq 
commitReq) {
+    final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq();
+
+    if (commitReq.body.hasRemaining()) {
+      int size = ReadWriteIOUtils.readInt(commitReq.body);
+      for (int i = 0; i < size; ++i) {
+        String committerKey = ReadWriteIOUtils.readString(commitReq.body);
+        int commitId = ReadWriteIOUtils.readInt(commitReq.body);
+        req.committerKeyAndCommitIds.add(new Pair<>(committerKey, commitId));
+      }
+    }
+
+    req.version = commitReq.version;
+    req.type = commitReq.type;
+    req.body = commitReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeCommitReq that = (PipeSubscribeCommitReq) obj;
+    return Objects.equals(this.committerKeyAndCommitIds, 
that.committerKeyAndCommitIds)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(committerKeyAndCommitIds, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
new file mode 100644
index 00000000000..595a417111f
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHandshakeReq.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeSubscribeHandshakeReq extends TPipeSubscribeReq {
+
+  private transient ConsumerConfig consumerConfig = new ConsumerConfig();
+
+  public ConsumerConfig getConsumerConfig() {
+    return consumerConfig;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHandshakeReq`, 
called by the subscription
+   * client.
+   */
+  public static PipeSubscribeHandshakeReq toTPipeSubscribeReq(ConsumerConfig 
consumerConfig)
+      throws IOException {
+    final PipeSubscribeHandshakeReq req = new PipeSubscribeHandshakeReq();
+
+    req.consumerConfig = consumerConfig;
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.HANDSHAKE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      consumerConfig.serialize(outputStream);
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeHandshakeReq 
fromTPipeSubscribeReq(TPipeSubscribeReq handshakeReq) {
+    final PipeSubscribeHandshakeReq req = new PipeSubscribeHandshakeReq();
+
+    if (handshakeReq.body.hasRemaining()) {
+      req.consumerConfig = ConsumerConfig.deserialize(handshakeReq.body);
+    }
+
+    req.version = handshakeReq.version;
+    req.type = handshakeReq.type;
+    req.body = handshakeReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeHandshakeReq that = (PipeSubscribeHandshakeReq) obj;
+    return Objects.equals(this.consumerConfig, that.consumerConfig)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(consumerConfig, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
new file mode 100644
index 00000000000..e3ab8bbca6b
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class PipeSubscribeHeartbeatReq extends TPipeSubscribeReq {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHeartbeatReq`, 
called by the subscription
+   * client.
+   */
+  public static PipeSubscribeHeartbeatReq toTPipeSubscribeReq() throws 
IOException {
+    final PipeSubscribeHeartbeatReq req = new PipeSubscribeHeartbeatReq();
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.HEARTBEAT.getType();
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeHeartbeatReq 
fromTPipeSubscribeReq(TPipeSubscribeReq heartbeatReq) {
+    final PipeSubscribeHeartbeatReq req = new PipeSubscribeHeartbeatReq();
+
+    req.version = heartbeatReq.version;
+    req.type = heartbeatReq.type;
+    req.body = heartbeatReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeHeartbeatReq that = (PipeSubscribeHeartbeatReq) obj;
+    return this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
new file mode 100644
index 00000000000..4d79dd1e912
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribePollReq.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribePollReq extends TPipeSubscribeReq {
+
+  private transient List<String> topicNames = new ArrayList<>();;
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribePollReq`, called by 
the subscription
+   * client.
+   */
+  public static PipeSubscribePollReq toTPipeSubscribeReq(List<String> 
topicNames)
+      throws IOException {
+    final PipeSubscribePollReq req = new PipeSubscribePollReq();
+
+    req.topicNames = topicNames;
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.POLL.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribePollReq fromTPipeSubscribeReq(TPipeSubscribeReq 
pollReq) {
+    final PipeSubscribePollReq req = new PipeSubscribePollReq();
+
+    if (pollReq.body.hasRemaining()) {
+      req.topicNames = ReadWriteIOUtils.readStringList(pollReq.body);
+    }
+
+    req.version = pollReq.version;
+    req.type = pollReq.type;
+    req.body = pollReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribePollReq that = (PipeSubscribePollReq) obj;
+    return Objects.equals(this.topicNames, that.topicNames)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicNames, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
new file mode 100644
index 00000000000..f9827e8f706
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeSubscribeRequestType {
+  HANDSHAKE((short) 0),
+  HEARTBEAT((short) 1),
+  POLL((short) 2),
+  COMMIT((short) 3),
+  CLOSE((short) 4),
+  SUBSCRIBE((short) 5),
+  UNSUBSCRIBE((short) 6),
+  ;
+
+  private final short type;
+
+  PipeSubscribeRequestType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  private static final Map<Short, PipeSubscribeRequestType> TYPE_MAP =
+      Arrays.stream(PipeSubscribeRequestType.values())
+          .collect(
+              HashMap::new,
+              (typeMap, pipeRequestType) -> 
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+              HashMap::putAll);
+
+  public static boolean isValidatedRequestType(short type) {
+    return TYPE_MAP.containsKey(type);
+  }
+
+  public static PipeSubscribeRequestType valueOf(short type) {
+    return TYPE_MAP.get(type);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
new file mode 100644
index 00000000000..8eb58db04a1
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestVersion.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+public enum PipeSubscribeRequestVersion {
+  VERSION_1((byte) 1),
+  ;
+
+  private final byte version;
+
+  PipeSubscribeRequestVersion(byte type) {
+    this.version = type;
+  }
+
+  public byte getVersion() {
+    return version;
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
new file mode 100644
index 00000000000..c9a502ce6d0
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSubscribeReq.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeSubscribeReq extends TPipeSubscribeReq {
+
+  private transient List<String> topicNames = new ArrayList<>();
+
+  public List<String> getTopicNames() {
+    return topicNames;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeSubscribeReq`, 
called by the subscription
+   * client.
+   */
+  public static PipeSubscribeSubscribeReq toTPipeSubscribeReq(List<String> 
topicNames)
+      throws IOException {
+    final PipeSubscribeSubscribeReq req = new PipeSubscribeSubscribeReq();
+
+    req.topicNames = topicNames;
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.SUBSCRIBE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeSubscribeReq 
fromTPipeSubscribeReq(TPipeSubscribeReq subscribeReq) {
+    final PipeSubscribeSubscribeReq req = new PipeSubscribeSubscribeReq();
+
+    if (subscribeReq.body.hasRemaining()) {
+      req.topicNames = ReadWriteIOUtils.readStringList(subscribeReq.body);
+    }
+
+    req.version = subscribeReq.version;
+    req.type = subscribeReq.type;
+    req.body = subscribeReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeSubscribeReq that = (PipeSubscribeSubscribeReq) obj;
+    return Objects.equals(this.topicNames, that.topicNames)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicNames, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
new file mode 100644
index 00000000000..f1b04c063ed
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeUnsubscribeReq.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribeUnsubscribeReq extends TPipeSubscribeReq {
+
+  private transient List<String> topicNames = new ArrayList<>();
+
+  public List<String> getTopicNames() {
+    return topicNames;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeUnsubscribeReq`, 
called by the
+   * subscription client.
+   */
+  public static PipeSubscribeUnsubscribeReq toTPipeSubscribeReq(List<String> 
topicNames)
+      throws IOException {
+    final PipeSubscribeUnsubscribeReq req = new PipeSubscribeUnsubscribeReq();
+
+    req.topicNames = topicNames;
+
+    req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+    req.type = PipeSubscribeRequestType.UNSUBSCRIBE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.writeStringList(topicNames, outputStream);
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return req;
+  }
+
+  /** Deserialize `TPipeSubscribeReq` to obtain parameters, called by the 
subscription server. */
+  public static PipeSubscribeUnsubscribeReq fromTPipeSubscribeReq(
+      TPipeSubscribeReq unsubscribeReq) {
+    final PipeSubscribeUnsubscribeReq req = new PipeSubscribeUnsubscribeReq();
+
+    if (unsubscribeReq.body.hasRemaining()) {
+      req.topicNames = ReadWriteIOUtils.readStringList(unsubscribeReq.body);
+    }
+
+    req.version = unsubscribeReq.version;
+    req.type = unsubscribeReq.type;
+    req.body = unsubscribeReq.body;
+
+    return req;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeUnsubscribeReq that = (PipeSubscribeUnsubscribeReq) obj;
+    return Objects.equals(this.topicNames, that.topicNames)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicNames, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
new file mode 100644
index 00000000000..17034327cab
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/EnrichedTablets.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class EnrichedTablets {
+
+  private transient String topicName;
+  private transient List<Tablet> tablets = new ArrayList<>();
+  private transient List<Pair<String, Integer>> committerKeyAndCommitIds = new 
ArrayList<>();
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(topicName, stream);
+    ReadWriteIOUtils.write(tablets.size(), stream);
+    for (Tablet tablet : tablets) {
+      tablet.serialize(stream);
+    }
+    ReadWriteIOUtils.write(committerKeyAndCommitIds.size(), stream);
+    for (Pair<String, Integer> committerKeyAndCommitId : 
committerKeyAndCommitIds) {
+      ReadWriteIOUtils.write(committerKeyAndCommitId.left, stream);
+      ReadWriteIOUtils.write(committerKeyAndCommitId.right, stream);
+    }
+  }
+
+  public static EnrichedTablets deserialize(ByteBuffer buffer) {
+    final EnrichedTablets enrichedTablets = new EnrichedTablets();
+    enrichedTablets.topicName = ReadWriteIOUtils.readString(buffer);
+    int size = ReadWriteIOUtils.readInt(buffer);
+    for (int i = 0; i < size; ++i) {
+      enrichedTablets.tablets.add(Tablet.deserialize(buffer));
+    }
+    size = ReadWriteIOUtils.readInt(buffer);
+    for (int i = 0; i < size; ++i) {
+      String committerKey = ReadWriteIOUtils.readString(buffer);
+      int commitId = ReadWriteIOUtils.readInt(buffer);
+      enrichedTablets.committerKeyAndCommitIds.add(new Pair<>(committerKey, 
commitId));
+    }
+    return enrichedTablets;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    EnrichedTablets that = (EnrichedTablets) obj;
+    return Objects.equals(this.topicName, that.topicName)
+        && Objects.equals(this.tablets, that.tablets)
+        && Objects.equals(this.committerKeyAndCommitIds, 
that.committerKeyAndCommitIds);
+  }
+
+  @Override
+  public int hashCode() {
+    // TODO: Tablet hashCode
+    return Objects.hash(topicName, tablets, committerKeyAndCommitIds);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
new file mode 100644
index 00000000000..1fdbcbc9a69
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCloseResp.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeCloseResp extends TPipeSubscribeResp {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeCloseResp`, called 
by the subscription
+   * server.
+   */
+  public static PipeSubscribeCloseResp toTPipeSubscribeResp(TSStatus status) {
+    final PipeSubscribeCloseResp resp = new PipeSubscribeCloseResp();
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeCloseResp 
fromTPipeSubscribeResp(TPipeSubscribeResp closeResp) {
+    final PipeSubscribeCloseResp resp = new PipeSubscribeCloseResp();
+
+    resp.status = closeResp.status;
+    resp.version = closeResp.version;
+    resp.type = closeResp.type;
+    resp.body = closeResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeCloseResp that = (PipeSubscribeCloseResp) obj;
+    return Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
new file mode 100644
index 00000000000..59407433370
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeCommitResp.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeCommitResp extends TPipeSubscribeResp {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeCommitResp`, called 
by the subscription
+   * server.
+   */
+  public static PipeSubscribeCommitResp toTPipeSubscribeResp(TSStatus status) {
+    final PipeSubscribeCommitResp resp = new PipeSubscribeCommitResp();
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeCommitResp 
fromTPipeSubscribeResp(TPipeSubscribeResp commitResp) {
+    final PipeSubscribeCommitResp resp = new PipeSubscribeCommitResp();
+
+    resp.status = commitResp.status;
+    resp.version = commitResp.version;
+    resp.type = commitResp.type;
+    resp.body = commitResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeCommitResp that = (PipeSubscribeCommitResp) obj;
+    return Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
new file mode 100644
index 00000000000..9a4e55aeb13
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeSubscribeHandshakeResp extends TPipeSubscribeResp {
+
+  // dataNodeId -> clientRpcEndPoint
+  private transient Map<Integer, TEndPoint> endPoints = new HashMap<>();
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHandshakeResp`, 
called by the subscription
+   * server.
+   */
+  public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(
+      TSStatus status, Map<Integer, TEndPoint> endPoints) {
+    final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
+
+    resp.endPoints = endPoints;
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(endPoints.size(), outputStream);
+      for (Map.Entry<Integer, TEndPoint> endPoint : endPoints.entrySet()) {
+        ReadWriteIOUtils.write(endPoint.getKey(), outputStream);
+        ReadWriteIOUtils.write(endPoint.getValue().ip, outputStream);
+        ReadWriteIOUtils.write(endPoint.getValue().port, outputStream);
+      }
+      resp.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      resp.status = 
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, e.getMessage());
+      return resp;
+    }
+
+    return resp;
+  }
+
+  public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus 
status) {
+    return toTPipeSubscribeResp(status, Collections.emptyMap());
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeHandshakeResp fromTPipeSubscribeResp(
+      TPipeSubscribeResp handshakeResp) {
+    final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
+
+    if (handshakeResp.body.hasRemaining()) {
+      int size = ReadWriteIOUtils.readInt(handshakeResp.body);
+      for (int i = 0; i < size; ++i) {
+        final int id = ReadWriteIOUtils.readInt(handshakeResp.body);
+        final String ip = ReadWriteIOUtils.readString(handshakeResp.body);
+        final int port = ReadWriteIOUtils.readInt(handshakeResp.body);
+        resp.endPoints.put(id, new TEndPoint(ip, port));
+      }
+    }
+
+    resp.status = handshakeResp.status;
+    resp.version = handshakeResp.version;
+    resp.type = handshakeResp.type;
+    resp.body = handshakeResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeHandshakeResp that = (PipeSubscribeHandshakeResp) obj;
+    return Objects.equals(this.endPoints, that.endPoints)
+        && Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(endPoints, status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
new file mode 100644
index 00000000000..42836a3a5a8
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeHeartbeatResp extends TPipeSubscribeResp {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHeartbeatResp`, 
called by the subscription
+   * server.
+   */
+  public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(TSStatus 
status) {
+    final PipeSubscribeHeartbeatResp resp = new PipeSubscribeHeartbeatResp();
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeHeartbeatResp fromTPipeSubscribeResp(
+      TPipeSubscribeResp heartbeatResp) {
+    final PipeSubscribeHeartbeatResp resp = new PipeSubscribeHeartbeatResp();
+
+    resp.status = heartbeatResp.status;
+    resp.version = heartbeatResp.version;
+    resp.type = heartbeatResp.type;
+    resp.body = heartbeatResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
+    return Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
new file mode 100644
index 00000000000..6e4b2d52a0d
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeSubscribePollResp extends TPipeSubscribeResp {
+
+  private transient List<EnrichedTablets> enrichedTabletsList = new 
ArrayList<>();
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribePollResp`, called by 
the subscription
+   * server.
+   */
+  public static PipeSubscribePollResp toTPipeSubscribeResp(
+      TSStatus status, List<EnrichedTablets> enrichedTabletsList) {
+    final PipeSubscribePollResp resp = new PipeSubscribePollResp();
+
+    resp.enrichedTabletsList = enrichedTabletsList;
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(enrichedTabletsList.size(), outputStream);
+      for (EnrichedTablets enrichedTablets : enrichedTabletsList) {
+        enrichedTablets.serialize(outputStream);
+      }
+      resp.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, 
e.getMessage());
+    }
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribePollResp 
fromTPipeSubscribeResp(TPipeSubscribeResp pollResp) {
+    final PipeSubscribePollResp resp = new PipeSubscribePollResp();
+
+    if (pollResp.body.hasRemaining()) {
+      int size = ReadWriteIOUtils.readInt(pollResp.body);
+      for (int i = 0; i < size; ++i) {
+        
resp.enrichedTabletsList.add(EnrichedTablets.deserialize(pollResp.body));
+      }
+    }
+
+    resp.status = pollResp.status;
+    resp.version = pollResp.version;
+    resp.type = pollResp.type;
+    resp.body = pollResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribePollResp that = (PipeSubscribePollResp) obj;
+    return Objects.equals(this.enrichedTabletsList, that.enrichedTabletsList)
+        && Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(enrichedTabletsList, status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
new file mode 100644
index 00000000000..aff0ed99e78
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeSubscribeResponseType {
+  ACK((short) 0),
+  POLL_TABLETS((short) 1),
+  ;
+
+  private final short type;
+
+  PipeSubscribeResponseType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  private static final Map<Short, PipeSubscribeResponseType> TYPE_MAP =
+      Arrays.stream(PipeSubscribeResponseType.values())
+          .collect(
+              HashMap::new,
+              (typeMap, pipeRequestType) -> 
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+              HashMap::putAll);
+
+  public static boolean isValidatedRequestType(short type) {
+    return TYPE_MAP.containsKey(type);
+  }
+
+  public static PipeSubscribeResponseType valueOf(short type) {
+    return TYPE_MAP.get(type);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
new file mode 100644
index 00000000000..d2eeacd6e34
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeResponseVersion.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+public enum PipeSubscribeResponseVersion {
+  VERSION_1((byte) 1),
+  ;
+
+  private final byte version;
+
+  PipeSubscribeResponseVersion(byte type) {
+    this.version = type;
+  }
+
+  public byte getVersion() {
+    return version;
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
new file mode 100644
index 00000000000..64337741907
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeSubscribeResp extends TPipeSubscribeResp {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeSubscribeResp`, 
called by the subscription
+   * server.
+   */
+  public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(TSStatus 
status) {
+    final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeSubscribeResp fromTPipeSubscribeResp(
+      TPipeSubscribeResp subscribeResp) {
+    final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+
+    resp.status = subscribeResp.status;
+    resp.version = subscribeResp.version;
+    resp.type = subscribeResp.type;
+    resp.body = subscribeResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeSubscribeResp that = (PipeSubscribeSubscribeResp) obj;
+    return Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, version, type, body);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
new file mode 100644
index 00000000000..6594a1a7919
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeUnsubscribeResp extends TPipeSubscribeResp {
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeUnsubscribeResp`, 
called by the
+   * subscription server.
+   */
+  public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(TSStatus 
status) {
+    final PipeSubscribeUnsubscribeResp resp = new 
PipeSubscribeUnsubscribeResp();
+
+    resp.status = status;
+    resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+    resp.type = PipeSubscribeResponseType.ACK.getType();
+
+    return resp;
+  }
+
+  /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
+  public static PipeSubscribeUnsubscribeResp fromTPipeSubscribeResp(
+      TPipeSubscribeResp unsubscribeResp) {
+    final PipeSubscribeUnsubscribeResp resp = new 
PipeSubscribeUnsubscribeResp();
+
+    resp.status = unsubscribeResp.status;
+    resp.version = unsubscribeResp.version;
+    resp.type = unsubscribeResp.type;
+    resp.body = unsubscribeResp.body;
+
+    return resp;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeSubscribeUnsubscribeResp that = (PipeSubscribeUnsubscribeResp) obj;
+    return Objects.equals(this.status, that.status)
+        && this.version == that.version
+        && this.type == that.type
+        && Objects.equals(this.body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, version, type, body);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index d386f45e167..03b28a97c6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -114,6 +114,8 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import 
org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
@@ -2565,6 +2567,12 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     return PipeAgent.receiver().thrift().receive(req, partitionFetcher, 
schemaFetcher);
   }
 
+  @Override
+  public TPipeSubscribeResp pipeSubscribe(TPipeSubscribeReq req) {
+    // TODO: SubscriptionAgent
+    return null;
+  }
+
   @Override
   public TSBackupConfigurationResp getBackupConfiguration() {
     return new 
TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 2b2f5585726..d96a04d7b21 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -490,6 +490,19 @@ struct TPipeTransferResp {
   2:optional binary body
 }
 
+struct TPipeSubscribeReq {
+  1:required i8 version
+  2:required i16 type
+  3:optional binary body
+}
+
+struct TPipeSubscribeResp {
+  1:required common.TSStatus status
+  2:required i8 version
+  3:required i16 type
+  4:optional binary body
+}
+
 struct TSBackupConfigurationResp {
   1: required common.TSStatus status
   2: optional bool enableOperationSync
@@ -637,6 +650,8 @@ service IClientRPCService {
 
   TPipeTransferResp pipeTransfer(TPipeTransferReq req);
 
+  TPipeSubscribeResp pipeSubscribe(TPipeSubscribeReq req);
+
   TSBackupConfigurationResp getBackupConfiguration();
 
   TSConnectionInfoResp fetchAllConnectionsInfo();

Reply via email to