Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2651#discussion_r189821240 --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java --- @@ -103,21 +92,34 @@ public void execute(Tuple input) { LOG.error("Failed to return results to DRPC server", tex); _collector.fail(input); } - reconnectClient((DRPCInvocationsClient) client); + client = getDRPCClient(host, port); } } } } - private void reconnectClient(DRPCInvocationsClient client) { - if (client instanceof DRPCInvocationsClient) { - try { - LOG.info("reconnecting... "); - client.reconnectClient(); //Blocking call - } catch (TException e2) { - LOG.error("Failed to connect to DRPC server", e2); + private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) { + DistributedRPCInvocations.Iface client; + if (local) { + client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); + } else { + List server = new ArrayList() { + { + add(host); + add(port); + } + }; + if (!_clients.containsKey(server)) { + try { + DRPCInvocationsClient oldClient = _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); --- End diff -- Now it loses the cache functionality. Instead of this way, can we invalidate cache when we find that the client is broken?
---