lokeshj1703 commented on a change in pull request #714: HDDS-1406. Avoid usage of commonPool in RatisPipelineUtils. URL: https://github.com/apache/hadoop/pull/714#discussion_r284614769
########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java ########## @@ -133,7 +173,86 @@ public Pipeline create(ReplicationFactor factor, .build(); } + + @Override + public void shutdown() { + forkJoinPool.shutdownNow(); + } + protected void initializePipeline(Pipeline pipeline) throws IOException { - RatisPipelineUtils.createPipeline(pipeline, conf); + createPipeline(pipeline); + } + + /** + * Sends ratis command to create pipeline on all the datanodes. + * + * @param pipeline - Pipeline to be created + * @throws IOException if creation fails + */ + public void createPipeline(Pipeline pipeline) + throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getNodes(), + (raftClient, peer) -> { + RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); + if (reply == null || !reply.isSuccess()) { + String msg = "Pipeline initialization failed for pipeline:" + + pipeline.getId() + " node:" + peer.getId(); + LOG.error(msg); + throw new IOException(msg); + } + }); + } + + private void callRatisRpc(List<DatanodeDetails> datanodes, + CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + final String rpcType = conf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final List< IOException > exceptions = + Collections.synchronizedList(new ArrayList<>()); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(conf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new + SecurityConfig(conf)); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(conf); + try { + forkJoinPool.submit(() -> { Review comment: Can we please verify that none of the threads are waiting for the parallel stream call to finish? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org