[ https://issues.apache.org/jira/browse/TINKERPOP-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607421#comment-16607421 ]
Simone Rondelli commented on TINKERPOP-2030: -------------------------------------------- Hi Stephen, No problem. I think the problem resides in this lines, calling {{oldKeepAliveFuture.cancel(true);}} do not actually cancel the task. {code:java} // try to cancel the old future if it's still un-executed - no need to ping since a new write has come // through on the connection if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true); {code} The problem with the proposed logic is that the keepAlive is sent either way, even if a write has been recently sent. An alternative approach would be to have the write method to update a variable (eg. {{lastWriteTime}}) that contain the time of the last write, and Thread with a while loop that periodically checks: {code} if (currentTime - lastWriteTime >= keepAliveInterval) { channel.writeAndFlush(channelizer.createKeepAliveMessage()); } {code} > KeepAlive task executed for every Connection.write call > ------------------------------------------------------- > > Key: TINKERPOP-2030 > URL: https://issues.apache.org/jira/browse/TINKERPOP-2030 > Project: TinkerPop > Issue Type: Bug > Components: driver > Affects Versions: 3.3.2 > Reporter: Simone Rondelli > Priority: Major > > The Gremlin Driver communicates with the server via {{Connection.write()}}. > The {{write()}} method has a logic in the end to schedule a task to keep the > connection alive, which by default is scheduled to be run after 30min, every > 30 min. > {code:java} > // try to keep the connection alive if the channel allows such things - > websockets will > if (channelizer.supportsKeepAlive() && keepAliveInterval > 0) { > final ScheduledFuture oldKeepAliveFuture = > keepAliveFuture.getAndSet(cluster.executor().scheduleAtFixedRate(() -> { > logger.debug("Request sent to server to keep {} alive", > thisConnection); > try { > > channel.writeAndFlush(channelizer.createKeepAliveMessage()); > } catch (Exception ex) { > // will just log this for now - a future real request can > be responsible for the failure that > // marks the host as dead. this also may not mean the > host is actually dead. more robust handling > // is in play for real requests, not this simple ping > logger.warn(String.format("Keep-alive did not succeed on > %s", thisConnection), ex); > } > }, keepAliveInterval, keepAliveInterval, TimeUnit.MILLISECONDS)); > // try to cancel the old future if it's still un-executed - no > need to ping since a new write has come > // through on the connection > if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true); > } > {code} > The problem with this is that on every call of the {{wirte()}} method, which > basically means on every query executed, the {[Connection}} schedule a > KeepAlive task which won't be run for at least 30 minutes. This lead the > {{Cluster.executor()}} queue to fill up with tasks waiting for completion. > One possible solution to fix this would be to schedule avoid this KeepAlive > task to be instantiated more than once per connection: > {code:java} > final class Connection { > ... > private final AtomicBoolean keepAliveInitialized = new > AtomicBoolean(false); > public ChannelPromise write(final RequestMessage requestMessage, final > CompletableFuture<ResultSet> future) { > ... > > // FIX HERE: with keepAliveInitialized.compareAndSet(false, true) we > ensure the keepAlive task is run only once per connection. > if (channelizer.supportsKeepAlive() && keepAliveInterval > 0 && > keepAliveInitialized.compareAndSet(false, true)) { > ... > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)