ptlrs commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3030959630
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -267,13 +255,16 @@ public void close() {
return;
}
- for (ManagedChannel channel : channels.values()) {
- channel.shutdown();
+ for (ChannelInfo channelInfo : dnChannelInfoMap.values()) {
+ channelInfo.getChannel().shutdown();
}
final long maxWaitNanos =
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
- List<ManagedChannel> nonTerminatedChannels = new
ArrayList<>(channels.values());
+ List<ManagedChannel> nonTerminatedChannels = dnChannelInfoMap.values()
+ .stream()
+ .map(ChannelInfo::getChannel)
+ .collect(Collectors.toList());
Review Comment:
Fixed
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
- if (isConnected(dn)) {
- return;
- }
- // read port from the data node, on failure use default configured port
- int port = dn.getStandalonePort().getValue();
- if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
- }
- final int finalPort = port;
-
- LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
-
- channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
- asyncStubs.remove(dnId);
- return null; // removes from channels map
+ dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
+ // channel is absent or stale
+ if (channelInfo == null || channelInfo.isChannelInactive()) {
+ LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn,
pipeline.getNodes());
+ try {
+ return generateNewChannel(dn);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- return channel;
+ // channel is present and active
+ return channelInfo;
});
+ }
- ManagedChannel channel;
- try {
- channel = channels.computeIfAbsent(dn.getID(), dnId -> {
- try {
- return createChannel(dn, finalPort).build();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (RuntimeException e) {
- LOG.error("Failed to create channel to datanode {}", dn, e);
- throw new IOException(e.getCause());
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]