Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r189821240
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +92,34 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC 
server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
                     }
                 }
             }
         }
     
    -    private void reconnectClient(DRPCInvocationsClient client) {
    -        if (client instanceof DRPCInvocationsClient) {
    -            try {
    -                LOG.info("reconnecting... ");
    -                client.reconnectClient(); //Blocking call
    -            } catch (TException e2) {
    -                LOG.error("Failed to connect to DRPC server", e2);
    +    private DistributedRPCInvocations.Iface getDRPCClient(String host, int 
port) {
    +        DistributedRPCInvocations.Iface client;
    +        if (local) {
    +            client = (DistributedRPCInvocations.Iface) 
ServiceRegistry.getService(host);
    +        } else {
    +            List server = new ArrayList() {
    +                {
    +                    add(host);
    +                    add(port);
    +                }
    +            };
    +            if (!_clients.containsKey(server)) {
    +                try {
    +                    DRPCInvocationsClient oldClient = _clients.put(server, 
new DRPCInvocationsClient(_conf, host, port));
    --- End diff --
    
    Now it loses the cache functionality. Instead of this way, can we 
invalidate cache when we find that the client is broken?


---

Reply via email to