TINKERPOP-932 Added "force" option on session close.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/4ff93ab1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/4ff93ab1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/4ff93ab1 Branch: refs/heads/TINKERPOP-1490 Commit: 4ff93ab18a2e66a4a1283af3b814854680cd6267 Parents: d7fb966 Author: Stephen Mallette <sp...@genoprime.com> Authored: Mon Oct 24 16:06:38 2016 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Wed Nov 2 11:18:36 2016 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 2 + docs/src/dev/provider/index.asciidoc | 5 ++ .../upgrade/release-3.2.x-incubating.asciidoc | 20 +++++ .../apache/tinkerpop/gremlin/driver/Client.java | 24 ++++++ .../tinkerpop/gremlin/driver/Connection.java | 7 +- .../apache/tinkerpop/gremlin/driver/Tokens.java | 1 + .../tinkerpop/gremlin/server/Settings.java | 1 - .../gremlin/server/op/session/Session.java | 83 ++++++++++++++------ .../server/op/session/SessionOpProcessor.java | 21 ++++- .../GremlinServerSessionIntegrateTest.java | 59 ++++++++++++-- 10 files changed, 189 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 328bc9d..882f6e9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* Added a `force` option for killing sessions without waiting for transaction close or timeout of a currently running job or multiple jobs. +* Deprecated `Session.kill()` and `Session.manualKill()`. * Added `choose(predicate,traversal)` and `choose(traversal,traversal)` to effect if/then-semantics (no else). Equivalent to `choose(x,y,identity())`. * `SparkGraphComputer` no longer starts a worker iteration if the worker's partition is empty. * Added `ProjectStep.getProjectKeys()` for strategies that rely on such information. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/docs/src/dev/provider/index.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc index 00ac4a9..c2f9f20 100644 --- a/docs/src/dev/provider/index.asciidoc +++ b/docs/src/dev/provider/index.asciidoc @@ -901,6 +901,11 @@ to send an alias pair with key of "g" and value of "g2" and thus allow the scrip |========================================================= |Key |Type |Description |session |String | *Required* The session identifier for the session to close. +|force |Boolean | Determines if the session should be force closed when the client is closed. Force closing will not +attempt to close open transactions from existing running jobs and leave it to the underlying graph to decided how to +proceed with those orphaned transactions. Setting this to `true` tends to lead to faster close operation and release +of resources which can be desirable if Gremlin Server has a long session timeout and a long script evaluation timeout +as attempts to close long run jobs can occur more rapidly. If not provided, this value is `false`. |========================================================= Traversal OpProcessor http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/docs/src/upgrade/release-3.2.x-incubating.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc index f121aa4..142e349 100644 --- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc +++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc @@ -71,6 +71,26 @@ following fields were deprecated: * `OptIn.SUITE_PROCESS_PERFORMANCE` * `OptIn.SUITE_STRUCTURE_PERFORMANCE` +Drivers Providers +^^^^^^^^^^^^^^^^^ + +Force Close ++++++++++++ + +Closing a session will first attempt a proper close of any open transactions. A problem can occur, however, if there is +a long run job (e.g. an OLAP-based traversal) executing, as that job will block the calls to close the transactions. +By exercising the option to a do a "forced close" the session will skip trying to close the transactions and just +attempt to interrupt the long run job. By not closing transactions, the session leaves it up to the underlying graph +database to sort out how it will deal with those orphaned transactions. On the positive side though (for those graphs +which do that well) , long run jobs have the opporunity to be cancelled without waiting for a timeout of the job itself +which will allow resources to be released earlier. + +The "force" argument is passed on the "close" message and is a boolean value. This is an optional argument to "close" +and defaults to `false`. + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-932[TINKERPOP-932], +link:http://tinkerpop.apache.org/docs/current/dev/provider/#_session_opprocessor[Provider Documentation - Session OpProcessor] + TinkerPop 3.2.3 --------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index bd397a1..2c448dc 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -804,10 +804,12 @@ public abstract class Client { public static class SessionSettings { private final boolean manageTransactions; private final String sessionId; + private final boolean forceClosed; private SessionSettings(final Builder builder) { manageTransactions = builder.manageTransactions; sessionId = builder.sessionId; + forceClosed = builder.forceClosed; } /** @@ -824,6 +826,14 @@ public abstract class Client { return sessionId; } + /** + * Determines if the session will be force closed. See {@link Builder#forceClosed(boolean)} for more details + * on what that means. + */ + public boolean isForceClosed() { + return forceClosed; + } + public static SessionSettings.Builder build() { return new SessionSettings.Builder(); } @@ -831,6 +841,7 @@ public abstract class Client { public static class Builder { private boolean manageTransactions = false; private String sessionId = UUID.randomUUID().toString(); + private boolean forceClosed = false; private Builder() {} @@ -854,6 +865,19 @@ public abstract class Client { return this; } + /** + * Determines if the session should be force closed when the client is closed. Force closing will not + * attempt to close open transactions from existing running jobs and leave it to the underlying graph to + * decided how to proceed with those orphaned transactions. Setting this to {@code true} tends to lead to + * faster close operation which can be desirable if Gremlin Server has a long session timeout and a long + * script evaluation timeout as attempts to close long run jobs can occur more rapidly. By default, this + * value is {@link false}. + */ + public Builder forceClosed(final boolean forced) { + this.forceClosed = forced; + return this; + } + public SessionSettings create() { return new SessionSettings(this); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 2a68032..571d906 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -306,9 +306,12 @@ final class Connection { // be called once. once shutdown is initiated, it shouldn't be executed a second time or else it sends more // messages at the server and leads to ugly log messages over there. if (shutdownInitiated.compareAndSet(false, true)) { + // maybe this should be delegated back to the Client implementation??? kinda weird to instanceof here..... if (client instanceof Client.SessionedClient) { - // maybe this should be delegated back to the Client implementation??? - final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE)).create(); + final boolean forceClose = client.getSettings().getSession().get().isForceClosed(); + final RequestMessage closeMessage = client.buildMessage( + RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create(); + final CompletableFuture<ResultSet> closed = new CompletableFuture<>(); write(closeMessage, closed); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java index 25c79af..fb577d7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java @@ -70,6 +70,7 @@ public final class Tokens { public static final String ARGS_BATCH_SIZE = "batchSize"; public static final String ARGS_BINDINGS = "bindings"; public static final String ARGS_ALIASES = "aliases"; + public static final String ARGS_FORCE = "force"; public static final String ARGS_GREMLIN = "gremlin"; public static final String ARGS_LANGUAGE = "language"; public static final String ARGS_SCRIPT_EVAL_TIMEOUT = "scriptEvaluationTimeout"; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 8b9fe89..4bb2089 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.server; import io.netty.handler.ssl.SslContext; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; -import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java index c9bc7c1..8f4da66 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java @@ -52,6 +52,7 @@ public class Session { private final String session; private final ScheduledExecutorService scheduledExecutorService; private final long configuredSessionTimeout; + private final long configuredPerGraphCloseTimeout; private AtomicBoolean killing = new AtomicBoolean(false); private AtomicReference<ScheduledFuture> kill = new AtomicReference<>(); @@ -85,7 +86,10 @@ public class Session { final Settings.ProcessorSettings processorSettings = this.settings.processors.stream() .filter(p -> p.className.equals(SessionOpProcessor.class.getCanonicalName())) .findAny().orElse(SessionOpProcessor.DEFAULT_SETTINGS); - this.configuredSessionTimeout = Long.parseLong(processorSettings.config.get(SessionOpProcessor.CONFIG_SESSION_TIMEOUT).toString()); + this.configuredSessionTimeout = Long.parseLong(processorSettings.config.getOrDefault( + SessionOpProcessor.CONFIG_SESSION_TIMEOUT, SessionOpProcessor.DEFAULT_SESSION_TIMEOUT).toString()); + this.configuredPerGraphCloseTimeout = Long.parseLong(processorSettings.config.getOrDefault( + SessionOpProcessor.CONFIG_PER_GRAPH_CLOSE_TIMEOUT, SessionOpProcessor.DEFAULT_PER_GRAPH_CLOSE_TIMEOUT).toString()); this.gremlinExecutor = initializeGremlinExecutor().create(); } @@ -129,17 +133,43 @@ public class Session { /** * Stops the session with call to {@link #kill()} but also stops the session expiration call which ensures that - * the session is only killed once. + * the session is only killed once. Calls {@link #manualKill(boolean)} with {@code false}. + * + * @deprecated As of release 3.2.4, replaced by {@link #manualKill(boolean)}. */ + @Deprecated public void manualKill() { + manualKill(false); + } + + /** + * Stops the session with call to {@link #kill()} but also stops the session expiration call which ensures that + * the session is only killed once. See {@link #kill(boolean)} for information on how what "forcing" the session + * kill will mean. + */ + public void manualKill(final boolean force) { kill.get().cancel(true); - kill(); + kill(force); } /** - * Kills the session and rollback any uncommitted changes on transactional graphs. + * Kills the session and rollback any uncommitted changes on transactional graphs. Same as calling + * {@link #kill(boolean)} with {@code false}. + * + * @deprecated As of release 3.2.4, replaced by {@link #kill(boolean)}. */ + @Deprecated public synchronized void kill() { + kill(false); + } + + /** + * Kills the session and rollback any uncommitted changes on transactional graphs. When "force" closed, the + * session won't bother to try to submit transaction close commands. It will be up to the underlying graph + * implementation to determine how it will clean up orphaned transactions. The force will try to cancel scheduled + * jobs and interrupt any currently running ones. Interruption is not guaranteed, but an attempt will be made. + */ + public synchronized void kill(final boolean force) { killing.set(true); // if the session has already been removed then there's no need to do this process again. it's possible that @@ -147,26 +177,33 @@ public class Session { // kill() from being called more than once if (!sessions.containsKey(session)) return; - // when the session is killed open transaction should be rolled back - graphManager.getGraphs().entrySet().forEach(kv -> { - final Graph g = kv.getValue(); - if (g.features().graph().supportsTransactions()) { - // have to execute the rollback in the executor because the transaction is associated with - // that thread of execution from this session - try { - executor.submit(() -> { - if (g.tx().isOpen()) { - logger.info("Rolling back open transactions on {} before killing session: {}", kv.getKey(), session); - g.tx().rollback(); - } - }).get(30000, TimeUnit.MILLISECONDS); - } catch (Exception ex) { - logger.warn("An error occurred while attempting rollback when closing session: " + session, ex); + if (!force) { + // when the session is killed open transaction should be rolled back + graphManager.getGraphs().entrySet().forEach(kv -> { + final Graph g = kv.getValue(); + if (g.features().graph().supportsTransactions()) { + // have to execute the rollback in the executor because the transaction is associated with + // that thread of execution from this session + try { + executor.submit(() -> { + if (g.tx().isOpen()) { + logger.info("Rolling back open transactions on {} before killing session: {}", kv.getKey(), session); + g.tx().rollback(); + } + }).get(configuredPerGraphCloseTimeout, TimeUnit.MILLISECONDS); + } catch (Exception ex) { + logger.warn(String.format("An error occurred while attempting rollback on %s when closing session: %s", kv.getKey(), session), ex); + } } - } - }); - - // prevent any additional requests from processing now that the mass rollback has been completed + }); + } else { + logger.info("Skipped attempt to close open graph transactions on {} - close was forced", session); + } + + // prevent any additional requests from processing. if the kill was not "forced" then jobs were scheduled to + // try to rollback open transactions. those jobs either timed-out or completed successfully. either way, no + // additional jobs will be allowed, running jobs will be cancelled (if possible) and any scheduled jobs will + // be cancelled executor.shutdownNow(); sessions.remove(session); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java index bec0c55..651bdb0 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java @@ -66,14 +66,26 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { } /** - * Configuration setting for how long a session will be available before it timesout. + * Configuration setting for how long a session will be available before it times out. */ public static final String CONFIG_SESSION_TIMEOUT = "sessionTimeout"; /** + * Configuration setting for how long to wait in milliseconds for each configured graph to close any open + * transactions when the session is killed. + */ + public static final String CONFIG_PER_GRAPH_CLOSE_TIMEOUT = "perGraphCloseTimeout"; + + /** * Default timeout for a session is eight hours. */ - public static final long DEFAULT_SESSION_TIMEOUT = 28800000l; + public static final long DEFAULT_SESSION_TIMEOUT = 28800000; + + /** + * Default amount of time to wait in milliseconds for each configured graph to close any open transactions when + * the session is killed. + */ + public static final long DEFAULT_PER_GRAPH_CLOSE_TIMEOUT = 10000; static final Settings.ProcessorSettings DEFAULT_SETTINGS = new Settings.ProcessorSettings(); @@ -81,6 +93,7 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { DEFAULT_SETTINGS.className = SessionOpProcessor.class.getCanonicalName(); DEFAULT_SETTINGS.config = new HashMap<String, Object>() {{ put(CONFIG_SESSION_TIMEOUT, DEFAULT_SESSION_TIMEOUT); + put(CONFIG_PER_GRAPH_CLOSE_TIMEOUT, DEFAULT_PER_GRAPH_CLOSE_TIMEOUT); }}; } @@ -106,6 +119,8 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create()); } + final boolean force = requestMessage.<Boolean>optionalArgs(Tokens.ARGS_FORCE).orElse(false); + return Optional.of(ctx -> { // validate the session is present and then remove it if it is. final Session sessionToClose = sessions.get(requestMessage.getArgs().get(Tokens.ARGS_SESSION).toString()); @@ -114,7 +129,7 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { throw new OpProcessorException(msg, ResponseMessage.build(requestMessage).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create()); } - sessionToClose.manualKill(); + sessionToClose.manualKill(force); }); } else { return Optional.empty(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4ff93ab1/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java index 3c1fef9..c384742 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java @@ -49,6 +49,7 @@ import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -84,47 +85,95 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInt final String nameOfTest = name.getMethodName(); switch (nameOfTest) { case "shouldHaveTheSessionTimeout": - case "shouldCloseSessionOnceOnRequest": settings.processors.clear(); final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings(); processorSettings.className = SessionOpProcessor.class.getCanonicalName(); processorSettings.config = new HashMap<>(); processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L); settings.processors.add(processorSettings); - + break; + case "shouldBlockAdditionalRequestsDuringForceClose": + case "shouldCloseSessionOnceOnRequest": + clearNeo4j(settings); Logger.getRootLogger().setLevel(Level.INFO); break; case "shouldEnsureSessionBindingsAreThreadSafe": settings.threadPoolWorker = 2; break; + case "shouldBlockAdditionalRequestsDuringClose": case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient": case "shouldExecuteInSessionWithTransactionManagement": case "shouldRollbackOnEvalExceptionForManagedTransaction": - deleteDirectory(new File("/tmp/neo4j")); - settings.graphs.put("graph", "conf/neo4j-empty.properties"); + clearNeo4j(settings); break; } return settings; } + private static void clearNeo4j(Settings settings) { + deleteDirectory(new File("/tmp/neo4j")); + settings.graphs.put("graph", "conf/neo4j-empty.properties"); + } + @Test public void shouldBlockAdditionalRequestsDuringClose() throws Exception { + assumeNeo4jIsPresent(); + // this is sorta cobbled together a bit given limits/rules about how you can use Cluster/Client instances. // basically, we need one to submit the long run job and one to do the close operation that will cancel the // long run job. it is probably possible to do this with some low-level message manipulation but that's // probably not necessary final Cluster cluster1 = Cluster.build().create(); final Client client1 = cluster1.connect(name.getMethodName()); - client1.submit("1+1").all().join(); + client1.submit("graph.addVertex()").all().join(); final Cluster cluster2 = Cluster.build().create(); final Client client2 = cluster2.connect(name.getMethodName()); client2.submit("1+1").all().join(); + final ResultSet rs = client1.submit("Thread.sleep(3000);1+1"); + + // close while the previous request is still executing + client2.close(); + + assertEquals(2, rs.all().join().get(0).getInt()); + + client1.close(); + + cluster1.close(); + cluster2.close(); + + // triggered an error during close and since we didn't force close, the attempt to close the transaction + // is made + assertThat(recordingAppender.getMessages(), hasItem("INFO - Rolling back open transactions on graph before killing session: " + name.getMethodName() + "\n")); + + } + + @Test + public void shouldBlockAdditionalRequestsDuringForceClose() throws Exception { + assumeNeo4jIsPresent(); + + // this is sorta cobbled together a bit given limits/rules about how you can use Cluster/Client instances. + // basically, we need one to submit the long run job and one to do the close operation that will cancel the + // long run job. it is probably possible to do this with some low-level message manipulation but that's + // probably not necessary + final Cluster cluster1 = Cluster.build().create(); + final Client client1 = cluster1.connect(name.getMethodName()); + client1.submit("graph.addVertex()").all().join(); + final Cluster cluster2 = Cluster.build().create(); + final Client.SessionSettings sessionSettings = Client.SessionSettings.build() + .sessionId(name.getMethodName()) + .forceClosed(true).create(); + final Client client2 = cluster2.connect(Client.Settings.build().useSession(sessionSettings).create()); + client2.submit("1+1").all().join(); + final ResultSet rs = client1.submit("Thread.sleep(10000);1+1"); client2.close(); + // because the close was forced, the message should appear immediately + assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on " + name.getMethodName() + " - close was forced\n")); + try { rs.all().join(); fail("The close of the session on client2 should have interrupted the script sent on client1");