This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch 
revert-2635-apache_master_0204_fix_client_pool_leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8df7ce1e8f93bf73ab637ddc3a085f02ca263116
Author: chaow <[email protected]>
AuthorDate: Sat Feb 20 09:56:20 2021 +0800

    Revert "[IOTDB-1148]fix the client leak of client pool&use remote schema 
cache when check timeseries exist or not"
---
 .../iotdb/cluster/client/DataClientProvider.java   |   7 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   | 125 +++++++--------------
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  26 ++---
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  57 ++++------
 .../cluster/query/aggregate/ClusterAggregator.java |  16 +--
 .../query/groupby/RemoteGroupByExecutor.java       |  37 +++---
 .../query/last/ClusterLastQueryExecutor.java       |  32 +++---
 .../cluster/query/reader/ClusterReaderFactory.java |  16 +--
 .../iotdb/cluster/query/reader/DataSourceInfo.java |   3 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |  16 ++-
 .../query/reader/RemoteSimpleSeriesReader.java     |  14 +--
 .../apache/iotdb/cluster/server/ClientServer.java  |  14 +--
 .../cluster/client/DataClientProviderTest.java     |   3 -
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   4 -
 14 files changed, 135 insertions(+), 235 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index d21e4e0..15e26a6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.client.sync.SyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -78,11 +77,7 @@ public class DataClientProvider {
   }
 
   /**
-   * IMPORTANT!!! After calling this function, the caller should make sure to 
call {@link
-   * org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to 
put the client back
-   * into the client pool, otherwise there is a risk of client leakage.
-   *
-   * <p>Get a thrift client that will connect to "node" using the data port.
+   * Get a thrift client that will connect to "node" using the data port.
    *
    * @param node the node to be connected
    * @param timeout timeout threshold of connection
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 50c3732..7ef9640 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -372,27 +372,6 @@ public class CMManager extends MManager {
     return super.getSeriesSchema(device, measurement);
   }
 
-  /**
-   * Check whether the path exists.
-   *
-   * @param path a full path or a prefix path
-   */
-  @Override
-  public boolean isPathExist(PartialPath path) {
-    boolean localExist = super.isPathExist(path);
-    if (localExist) {
-      return true;
-    }
-
-    // search the cache
-    cacheLock.readLock().lock();
-    try {
-      return mRemoteMetaCache.containsKey(path);
-    } finally {
-      cacheLock.readLock().unlock();
-    }
-  }
-
   private static class RemoteMetaCache extends LRUCache<PartialPath, 
MeasurementMNode> {
 
     RemoteMetaCache(int cacheSize) {
@@ -418,10 +397,6 @@ public class CMManager extends MManager {
         return null;
       }
     }
-
-    public synchronized boolean containsKey(PartialPath key) {
-      return cache.containsKey(key);
-    }
   }
 
   /**
@@ -671,18 +646,14 @@ public class CMManager extends MManager {
               SyncClientAdaptor.getUnregisteredMeasurements(
                   client, partitionGroup.getHeader(), seriesList);
         } else {
-          SyncDataClient syncDataClient = null;
-          try {
-            syncDataClient =
-                metaGroupMember
-                    .getClientProvider()
-                    .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-            result =
-                
syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), 
seriesList);
-          } finally {
-            ClientUtils.putBackSyncClient(syncDataClient);
-          }
+          SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          result = 
syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), 
seriesList);
+          ClientUtils.putBackSyncClient(syncDataClient);
         }
+
         if (result != null) {
           return result;
         }
@@ -855,21 +826,16 @@ public class CMManager extends MManager {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        PullSchemaResp pullSchemaResp = 
syncDataClient.pullTimeSeriesSchema(request);
-        ByteBuffer buffer = pullSchemaResp.schemaBytes;
-        int size = buffer.getInt();
-        schemas = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-          schemas.add(TimeseriesSchema.deserializeFrom(buffer));
-        }
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      PullSchemaResp pullSchemaResp = 
syncDataClient.pullTimeSeriesSchema(request);
+      ByteBuffer buffer = pullSchemaResp.schemaBytes;
+      int size = buffer.getInt();
+      schemas = new ArrayList<>(size);
+      for (int i = 0; i < size; i++) {
+        schemas.add(TimeseriesSchema.deserializeFrom(buffer));
       }
     }
 
@@ -1094,16 +1060,12 @@ public class CMManager extends MManager {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, 
withAlias);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+      ClientUtils.putBackSyncClient(syncDataClient);
     }
 
     if (result != null) {
@@ -1218,16 +1180,12 @@ public class CMManager extends MManager {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        paths = syncDataClient.getAllDevices(header, pathsToQuery);
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      paths = syncDataClient.getAllDevices(header, pathsToQuery);
+      ClientUtils.putBackSyncClient(syncDataClient);
     }
     return paths;
   }
@@ -1543,20 +1501,17 @@ public class CMManager extends MManager {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, 
group.getHeader(), plan);
     } else {
-      SyncDataClient syncDataClient = null;
-      try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-          DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        plan.serialize(dataOutputStream);
-        resultBinary =
-            syncDataClient.getAllMeasurementSchema(
-                group.getHeader(), 
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+      DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+      plan.serialize(dataOutputStream);
+      resultBinary =
+          syncDataClient.getAllMeasurementSchema(
+              group.getHeader(), 
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+      ClientUtils.putBackSyncClient(syncDataClient);
     }
     return resultBinary;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e98b67c..74855fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -221,21 +220,16 @@ public class MetaPuller {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       schemas = SyncClientAdaptor.pullMeasurementSchema(client, request);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        PullSchemaResp pullSchemaResp = 
syncDataClient.pullTimeSeriesSchema(request);
-        ByteBuffer buffer = pullSchemaResp.schemaBytes;
-        int size = buffer.getInt();
-        schemas = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-          schemas.add(MeasurementSchema.deserializeFrom(buffer));
-        }
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      PullSchemaResp pullSchemaResp = 
syncDataClient.pullTimeSeriesSchema(request);
+      ByteBuffer buffer = pullSchemaResp.schemaBytes;
+      int size = buffer.getInt();
+      schemas = new ArrayList<>(size);
+      for (int i = 0; i < size; i++) {
+        schemas.add(MeasurementSchema.deserializeFrom(buffer));
       }
     }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index e76be6c..d525963 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -254,18 +254,15 @@ public class ClusterPlanExecutor extends PlanExecutor {
               SyncClientAdaptor.getPathCount(
                   client, partitionGroup.getHeader(), pathsToQuery, level);
         } else {
-          SyncDataClient syncDataClient = null;
-          try {
-            syncDataClient =
-                metaGroupMember
-                    .getClientProvider()
-                    .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-            syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
-            count = syncDataClient.getPathCount(partitionGroup.getHeader(), 
pathsToQuery, level);
-          } finally {
-            ClientUtils.putBackSyncClient(syncDataClient);
-          }
+          SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
+          count = syncDataClient.getPathCount(partitionGroup.getHeader(), 
pathsToQuery, level);
+          ClientUtils.putBackSyncClient(syncDataClient);
         }
+
         logger.debug(
             "{}: get path count of {} from {}, result {}",
             metaGroupMember.getName(),
@@ -360,18 +357,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
               SyncClientAdaptor.getNodeList(
                   client, group.getHeader(), schemaPattern.getFullPath(), 
level);
         } else {
-          SyncDataClient syncDataClient = null;
-          try {
-            syncDataClient =
-                metaGroupMember
-                    .getClientProvider()
-                    .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-            paths =
-                syncDataClient.getNodeList(group.getHeader(), 
schemaPattern.getFullPath(), level);
-          } finally {
-            ClientUtils.putBackSyncClient(syncDataClient);
-          }
+          SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          paths = syncDataClient.getNodeList(group.getHeader(), 
schemaPattern.getFullPath(), level);
+          ClientUtils.putBackSyncClient(syncDataClient);
         }
+
         if (paths != null) {
           break;
         }
@@ -384,6 +377,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
         Thread.currentThread().interrupt();
       }
     }
+
     return PartialPath.fromStringList(paths);
   }
 
@@ -473,18 +467,15 @@ public class ClusterPlanExecutor extends PlanExecutor {
           nextChildren =
               SyncClientAdaptor.getNextChildren(client, group.getHeader(), 
path.getFullPath());
         } else {
-          SyncDataClient syncDataClient = null;
-          try {
-            syncDataClient =
-                metaGroupMember
-                    .getClientProvider()
-                    .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-            nextChildren =
-                syncDataClient.getChildNodePathInNextLevel(group.getHeader(), 
path.getFullPath());
-          } finally {
-            ClientUtils.putBackSyncClient(syncDataClient);
-          }
+          SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          nextChildren =
+              syncDataClient.getChildNodePathInNextLevel(group.getHeader(), 
path.getFullPath());
+          ClientUtils.putBackSyncClient(syncDataClient);
         }
+
         if (nextChildren != null) {
           break;
         }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index bd54087..3549e58 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -271,16 +271,12 @@ public class ClusterAggregator {
       // each buffer is an AggregationResult
       resultBuffers = SyncClientAdaptor.getAggrResult(client, request);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        resultBuffers = syncDataClient.getAggrResult(request);
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      resultBuffers = syncDataClient.getAggrResult(request);
+      ClientUtils.putBackSyncClient(syncDataClient);
     }
     return resultBuffers;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index 4289e23..d7629f6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -85,18 +85,14 @@ public class RemoteGroupByExecutor implements 
GroupByExecutor {
             SyncClientAdaptor.getGroupByResult(
                 client, header, executorId, curStartTime, curEndTime);
       } else {
-        SyncDataClient syncDataClient = null;
-        try {
-          syncDataClient =
-              metaGroupMember
-                  .getClientProvider()
-                  .getSyncDataClient(source, 
RaftServer.getReadOperationTimeoutMS());
-          aggrBuffers =
-              syncDataClient.getGroupByResult(header, executorId, 
curStartTime, curEndTime);
-        } finally {
-          ClientUtils.putBackSyncClient(syncDataClient);
-        }
+        SyncDataClient syncDataClient =
+            metaGroupMember
+                .getClientProvider()
+                .getSyncDataClient(source, 
RaftServer.getReadOperationTimeoutMS());
+        aggrBuffers = syncDataClient.getGroupByResult(header, executorId, 
curStartTime, curEndTime);
+        ClientUtils.putBackSyncClient(syncDataClient);
       }
+
     } catch (TException e) {
       throw new IOException(e);
     } catch (InterruptedException e) {
@@ -133,18 +129,15 @@ public class RemoteGroupByExecutor implements 
GroupByExecutor {
             SyncClientAdaptor.peekNextNotNullValue(
                 client, header, executorId, nextStartTime, nextEndTime);
       } else {
-        SyncDataClient syncDataClient = null;
-        try {
-          syncDataClient =
-              metaGroupMember
-                  .getClientProvider()
-                  .getSyncDataClient(source, 
RaftServer.getReadOperationTimeoutMS());
-          aggrBuffer =
-              syncDataClient.peekNextNotNullValue(header, executorId, 
nextStartTime, nextEndTime);
-        } finally {
-          ClientUtils.putBackSyncClient(syncDataClient);
-        }
+        SyncDataClient syncDataClient =
+            metaGroupMember
+                .getClientProvider()
+                .getSyncDataClient(source, 
RaftServer.getReadOperationTimeoutMS());
+        aggrBuffer =
+            syncDataClient.peekNextNotNullValue(header, executorId, 
nextStartTime, nextEndTime);
+        ClientUtils.putBackSyncClient(syncDataClient);
       }
+
     } catch (TException e) {
       throw new IOException(e);
     } catch (InterruptedException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index d03a85d..066da1d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -258,23 +258,21 @@ public class ClusterLastQueryExecutor extends 
LastQueryExecutor {
     }
 
     private ByteBuffer lastSync(Node node, QueryContext context) throws 
TException {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        return syncDataClient.last(
-            new LastQueryRequest(
-                PartialPath.toStringList(seriesPaths),
-                dataTypeOrdinals,
-                context.getQueryId(),
-                queryPlan.getDeviceToMeasurements(),
-                group.getHeader(),
-                syncDataClient.getNode()));
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      ByteBuffer result =
+          syncDataClient.last(
+              new LastQueryRequest(
+                  PartialPath.toStringList(seriesPaths),
+                  dataTypeOrdinals,
+                  context.getQueryId(),
+                  queryPlan.getDeviceToMeasurements(),
+                  group.getHeader(),
+                  syncDataClient.getNode()));
+      ClientUtils.putBackSyncClient(syncDataClient);
+      return result;
     }
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 9b78175..2e239b9 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -657,16 +657,12 @@ public class ClusterReaderFactory {
               .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
       executorId = SyncClientAdaptor.getGroupByExecutor(client, request);
     } else {
-      SyncDataClient syncDataClient = null;
-      try {
-        syncDataClient =
-            metaGroupMember
-                .getClientProvider()
-                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
-        executorId = syncDataClient.getGroupByExecutor(request);
-      } finally {
-        ClientUtils.putBackSyncClient(syncDataClient);
-      }
+      SyncDataClient syncDataClient =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      executorId = syncDataClient.getGroupByExecutor(request);
+      ClientUtils.putBackSyncClient(syncDataClient);
     }
     return executorId;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 7cec0ea..1ed1fe6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
@@ -177,7 +176,7 @@ public class DataSourceInfo {
       }
       return newReaderId;
     } finally {
-      ClientUtils.putBackSyncClient(client);
+      client.putBack();
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 2b68ef4..419f3cf 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.utils.SerializeUtils;
 
@@ -90,21 +89,20 @@ public class RemoteSeriesReaderByTimestamp implements 
IReaderByTimestamp {
   }
 
   private ByteBuffer fetchResultSync(long timestamp) throws IOException {
-    SyncDataClient curSyncClient = null;
     try {
-      curSyncClient = 
sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
-      return curSyncClient.fetchSingleSeriesByTimestamp(
-          sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+      SyncDataClient curSyncClient =
+          sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
+      ByteBuffer buffer =
+          curSyncClient.fetchSingleSeriesByTimestamp(
+              sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
+      curSyncClient.putBack();
+      return buffer;
     } catch (TException e) {
       // try other node
       if (!sourceInfo.switchNode(true, timestamp)) {
         return null;
       }
       return fetchResultSync(timestamp);
-    } finally {
-      if (curSyncClient != null) {
-        ClientUtils.putBackSyncClient(curSyncClient);
-      }
     }
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 2dcc1b7..f6c5a45 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -144,20 +143,19 @@ public class RemoteSimpleSeriesReader implements 
IPointReader {
   }
 
   private ByteBuffer fetchResultSync() throws IOException {
-    SyncDataClient curSyncClient = null;
     try {
-      curSyncClient = 
sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
-      return curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), 
sourceInfo.getReaderId());
+      SyncDataClient curSyncClient =
+          sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
+      ByteBuffer buffer =
+          curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), 
sourceInfo.getReaderId());
+      curSyncClient.putBack();
+      return buffer;
     } catch (TException e) {
       // try other node
       if (!sourceInfo.switchNode(false, lastTimestamp)) {
         return null;
       }
       return fetchResultSync();
-    } finally {
-      if (curSyncClient != null) {
-        ClientUtils.putBackSyncClient(curSyncClient);
-      }
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index df5b30e..be05a4b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -313,15 +312,10 @@ public class ClientServer extends TSServiceImpl {
                       queriedNode, RaftServer.getReadOperationTimeoutMS());
               client.endQuery(header, coordinator.getThisNode(), queryId, 
handler);
             } else {
-              SyncDataClient syncDataClient = null;
-              try {
-                syncDataClient =
-                    coordinator.getSyncDataClient(
-                        queriedNode, RaftServer.getReadOperationTimeoutMS());
-                syncDataClient.endQuery(header, coordinator.getThisNode(), 
queryId);
-              } finally {
-                ClientUtils.putBackSyncClient(syncDataClient);
-              }
+              SyncDataClient syncDataClient =
+                  coordinator.getSyncDataClient(
+                      queriedNode, RaftServer.getReadOperationTimeoutMS());
+              syncDataClient.endQuery(header, coordinator.getThisNode(), 
queryId);
             }
           } catch (IOException | TException e) {
             logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
index 58d929b..4d08a04 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClientUtils;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -74,8 +73,6 @@ public class DataClientProviderTest {
         client = provider.getSyncDataClient(node, 100);
       } catch (TException e) {
         Assert.fail(e.getMessage());
-      } finally {
-        ClientUtils.putBackSyncClient(client);
       }
       assertNotNull(client);
       
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 881db03..047b523 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -78,10 +78,6 @@ public class PublicBAOS extends ByteArrayOutputStream {
     count = 0;
   }
 
-  /**
-   * The synchronized keyword in this function is intentionally removed. For 
details, see
-   * https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173085039
-   */
   @Override
   public int size() {
     return count;

Reply via email to