This is an automated email from the ASF dual-hosted git repository. chaow pushed a commit to branch revert-2635-apache_master_0204_fix_client_pool_leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8df7ce1e8f93bf73ab637ddc3a085f02ca263116 Author: chaow <[email protected]> AuthorDate: Sat Feb 20 09:56:20 2021 +0800 Revert "[IOTDB-1148]fix the client leak of client pool&use remote schema cache when check timeseries exist or not" --- .../iotdb/cluster/client/DataClientProvider.java | 7 +- .../apache/iotdb/cluster/metadata/CMManager.java | 125 +++++++-------------- .../apache/iotdb/cluster/metadata/MetaPuller.java | 26 ++--- .../iotdb/cluster/query/ClusterPlanExecutor.java | 57 ++++------ .../cluster/query/aggregate/ClusterAggregator.java | 16 +-- .../query/groupby/RemoteGroupByExecutor.java | 37 +++--- .../query/last/ClusterLastQueryExecutor.java | 32 +++--- .../cluster/query/reader/ClusterReaderFactory.java | 16 +-- .../iotdb/cluster/query/reader/DataSourceInfo.java | 3 +- .../reader/RemoteSeriesReaderByTimestamp.java | 16 ++- .../query/reader/RemoteSimpleSeriesReader.java | 14 +-- .../apache/iotdb/cluster/server/ClientServer.java | 14 +-- .../cluster/client/DataClientProviderTest.java | 3 - .../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 - 14 files changed, 135 insertions(+), 235 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java index d21e4e0..15e26a6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java @@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.client.sync.SyncClientPool; import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; @@ -78,11 +77,7 @@ public class DataClientProvider { } /** - * IMPORTANT!!! After calling this function, the caller should make sure to call {@link - * org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to put the client back - * into the client pool, otherwise there is a risk of client leakage. - * - * <p>Get a thrift client that will connect to "node" using the data port. + * Get a thrift client that will connect to "node" using the data port. * * @param node the node to be connected * @param timeout timeout threshold of connection diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 50c3732..7ef9640 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -372,27 +372,6 @@ public class CMManager extends MManager { return super.getSeriesSchema(device, measurement); } - /** - * Check whether the path exists. - * - * @param path a full path or a prefix path - */ - @Override - public boolean isPathExist(PartialPath path) { - boolean localExist = super.isPathExist(path); - if (localExist) { - return true; - } - - // search the cache - cacheLock.readLock().lock(); - try { - return mRemoteMetaCache.containsKey(path); - } finally { - cacheLock.readLock().unlock(); - } - } - private static class RemoteMetaCache extends LRUCache<PartialPath, MeasurementMNode> { RemoteMetaCache(int cacheSize) { @@ -418,10 +397,6 @@ public class CMManager extends MManager { return null; } } - - public synchronized boolean containsKey(PartialPath key) { - return cache.containsKey(key); - } } /** @@ -671,18 +646,14 @@ public class CMManager extends MManager { SyncClientAdaptor.getUnregisteredMeasurements( client, partitionGroup.getHeader(), seriesList); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - result = - syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList); + ClientUtils.putBackSyncClient(syncDataClient); } + if (result != null) { return result; } @@ -855,21 +826,16 @@ public class CMManager extends MManager { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); - ByteBuffer buffer = pullSchemaResp.schemaBytes; - int size = buffer.getInt(); - schemas = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - schemas.add(TimeseriesSchema.deserializeFrom(buffer)); - } - } finally { - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); + ByteBuffer buffer = pullSchemaResp.schemaBytes; + int size = buffer.getInt(); + schemas = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + schemas.add(TimeseriesSchema.deserializeFrom(buffer)); } } @@ -1094,16 +1060,12 @@ public class CMManager extends MManager { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias); + ClientUtils.putBackSyncClient(syncDataClient); } if (result != null) { @@ -1218,16 +1180,12 @@ public class CMManager extends MManager { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - paths = syncDataClient.getAllDevices(header, pathsToQuery); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + paths = syncDataClient.getAllDevices(header, pathsToQuery); + ClientUtils.putBackSyncClient(syncDataClient); } return paths; } @@ -1543,20 +1501,17 @@ public class CMManager extends MManager { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan); } else { - SyncDataClient syncDataClient = null; - try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - plan.serialize(dataOutputStream); - resultBinary = - syncDataClient.getAllMeasurementSchema( - group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + plan.serialize(dataOutputStream); + resultBinary = + syncDataClient.getAllMeasurementSchema( + group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); + ClientUtils.putBackSyncClient(syncDataClient); } return resultBinary; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java index e98b67c..74855fc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java @@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; @@ -221,21 +220,16 @@ public class MetaPuller { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); schemas = SyncClientAdaptor.pullMeasurementSchema(client, request); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); - ByteBuffer buffer = pullSchemaResp.schemaBytes; - int size = buffer.getInt(); - schemas = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - schemas.add(MeasurementSchema.deserializeFrom(buffer)); - } - } finally { - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); + ByteBuffer buffer = pullSchemaResp.schemaBytes; + int size = buffer.getInt(); + schemas = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + schemas.add(MeasurementSchema.deserializeFrom(buffer)); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java index e76be6c..d525963 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java @@ -254,18 +254,15 @@ public class ClusterPlanExecutor extends PlanExecutor { SyncClientAdaptor.getPathCount( client, partitionGroup.getHeader(), pathsToQuery, level); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS()); - count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS()); + count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level); + ClientUtils.putBackSyncClient(syncDataClient); } + logger.debug( "{}: get path count of {} from {}, result {}", metaGroupMember.getName(), @@ -360,18 +357,14 @@ public class ClusterPlanExecutor extends PlanExecutor { SyncClientAdaptor.getNodeList( client, group.getHeader(), schemaPattern.getFullPath(), level); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - paths = - syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level); + ClientUtils.putBackSyncClient(syncDataClient); } + if (paths != null) { break; } @@ -384,6 +377,7 @@ public class ClusterPlanExecutor extends PlanExecutor { Thread.currentThread().interrupt(); } } + return PartialPath.fromStringList(paths); } @@ -473,18 +467,15 @@ public class ClusterPlanExecutor extends PlanExecutor { nextChildren = SyncClientAdaptor.getNextChildren(client, group.getHeader(), path.getFullPath()); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - nextChildren = - syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath()); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + nextChildren = + syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath()); + ClientUtils.putBackSyncClient(syncDataClient); } + if (nextChildren != null) { break; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java index bd54087..3549e58 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java @@ -271,16 +271,12 @@ public class ClusterAggregator { // each buffer is an AggregationResult resultBuffers = SyncClientAdaptor.getAggrResult(client, request); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - resultBuffers = syncDataClient.getAggrResult(request); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + resultBuffers = syncDataClient.getAggrResult(request); + ClientUtils.putBackSyncClient(syncDataClient); } return resultBuffers; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java index 4289e23..d7629f6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java @@ -85,18 +85,14 @@ public class RemoteGroupByExecutor implements GroupByExecutor { SyncClientAdaptor.getGroupByResult( client, header, executorId, curStartTime, curEndTime); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); - aggrBuffers = - syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); + aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime); + ClientUtils.putBackSyncClient(syncDataClient); } + } catch (TException e) { throw new IOException(e); } catch (InterruptedException e) { @@ -133,18 +129,15 @@ public class RemoteGroupByExecutor implements GroupByExecutor { SyncClientAdaptor.peekNextNotNullValue( client, header, executorId, nextStartTime, nextEndTime); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); - aggrBuffer = - syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); + aggrBuffer = + syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime); + ClientUtils.putBackSyncClient(syncDataClient); } + } catch (TException e) { throw new IOException(e); } catch (InterruptedException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java index d03a85d..066da1d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java @@ -258,23 +258,21 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor { } private ByteBuffer lastSync(Node node, QueryContext context) throws TException { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - return syncDataClient.last( - new LastQueryRequest( - PartialPath.toStringList(seriesPaths), - dataTypeOrdinals, - context.getQueryId(), - queryPlan.getDeviceToMeasurements(), - group.getHeader(), - syncDataClient.getNode())); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + ByteBuffer result = + syncDataClient.last( + new LastQueryRequest( + PartialPath.toStringList(seriesPaths), + dataTypeOrdinals, + context.getQueryId(), + queryPlan.getDeviceToMeasurements(), + group.getHeader(), + syncDataClient.getNode())); + ClientUtils.putBackSyncClient(syncDataClient); + return result; } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 9b78175..2e239b9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -657,16 +657,12 @@ public class ClusterReaderFactory { .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); executorId = SyncClientAdaptor.getGroupByExecutor(client, request); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - executorId = syncDataClient.getGroupByExecutor(request); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + executorId = syncDataClient.getGroupByExecutor(request); + ClientUtils.putBackSyncClient(syncDataClient); } return executorId; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java index 7cec0ea..1ed1fe6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java @@ -29,7 +29,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.TimeFilter; @@ -177,7 +176,7 @@ public class DataSourceInfo { } return newReaderId; } finally { - ClientUtils.putBackSyncClient(client); + client.putBack(); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java index 2b68ef4..419f3cf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java @@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.utils.SerializeUtils; @@ -90,21 +89,20 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp { } private ByteBuffer fetchResultSync(long timestamp) throws IOException { - SyncDataClient curSyncClient = null; try { - curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); - return curSyncClient.fetchSingleSeriesByTimestamp( - sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp); + SyncDataClient curSyncClient = + sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); + ByteBuffer buffer = + curSyncClient.fetchSingleSeriesByTimestamp( + sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp); + curSyncClient.putBack(); + return buffer; } catch (TException e) { // try other node if (!sourceInfo.switchNode(true, timestamp)) { return null; } return fetchResultSync(timestamp); - } finally { - if (curSyncClient != null) { - ClientUtils.putBackSyncClient(curSyncClient); - } } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java index 2dcc1b7..f6c5a45 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java @@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -144,20 +143,19 @@ public class RemoteSimpleSeriesReader implements IPointReader { } private ByteBuffer fetchResultSync() throws IOException { - SyncDataClient curSyncClient = null; try { - curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); - return curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId()); + SyncDataClient curSyncClient = + sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); + ByteBuffer buffer = + curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId()); + curSyncClient.putBack(); + return buffer; } catch (TException e) { // try other node if (!sourceInfo.switchNode(false, lastTimestamp)) { return null; } return fetchResultSync(); - } finally { - if (curSyncClient != null) { - ClientUtils.putBackSyncClient(curSyncClient); - } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index df5b30e..be05a4b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -313,15 +312,10 @@ public class ClientServer extends TSServiceImpl { queriedNode, RaftServer.getReadOperationTimeoutMS()); client.endQuery(header, coordinator.getThisNode(), queryId, handler); } else { - SyncDataClient syncDataClient = null; - try { - syncDataClient = - coordinator.getSyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS()); - syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); - } finally { - ClientUtils.putBackSyncClient(syncDataClient); - } + SyncDataClient syncDataClient = + coordinator.getSyncDataClient( + queriedNode, RaftServer.getReadOperationTimeoutMS()); + syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); } } catch (IOException | TException e) { logger.error("Cannot end query {} in {}", queryId, queriedNode); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java index 58d929b..4d08a04 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java @@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol.Factory; @@ -74,8 +73,6 @@ public class DataClientProviderTest { client = provider.getSyncDataClient(node, 100); } catch (TException e) { Assert.fail(e.getMessage()); - } finally { - ClientUtils.putBackSyncClient(client); } assertNotNull(client); ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java index 881db03..047b523 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java @@ -78,10 +78,6 @@ public class PublicBAOS extends ByteArrayOutputStream { count = 0; } - /** - * The synchronized keyword in this function is intentionally removed. For details, see - * https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173085039 - */ @Override public int size() { return count;
