This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-tx-client in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit bee7717ede38615c844ebb26fd5bc8789ba75ee9 Author: Ken Hu <[email protected]> AuthorDate: Mon Mar 16 17:24:06 2026 -0700 gremlin-driver implementation of remote transactions --- .../traversal/dsl/graph/GraphTraversalSource.java | 13 +- .../apache/tinkerpop/gremlin/driver/Client.java | 206 ++-- .../apache/tinkerpop/gremlin/driver/Cluster.java | 195 +++- .../tinkerpop/gremlin/driver/Connection.java | 7 - .../tinkerpop/gremlin/driver/ConnectionPool.java | 23 +- .../tinkerpop/gremlin/driver/RequestOptions.java | 31 + .../tinkerpop/gremlin/driver/RequestSubmitter.java | 64 ++ .../gremlin/driver/RequestSubmitterAsync.java | 71 ++ .../driver/handler/HttpGremlinRequestEncoder.java | 7 + .../driver/remote/DriverRemoteConnection.java | 15 +- .../driver/remote/HttpRemoteTransaction.java | 315 ++++++ .../driver/remote/TransactionRemoteConnection.java | 113 +++ .../tinkerpop/gremlin/driver/ClientTest.java | 159 +++ .../GremlinDriverTransactionIntegrateTest.java | 1059 ++++++++++++++++++++ 14 files changed, 2051 insertions(+), 227 deletions(-) diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java index b1d417ee02..684d9e8609 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java @@ -703,14 +703,19 @@ public class GraphTraversalSource implements TraversalSource { /** * Proxies calls through to the underlying {@link Graph#tx()} or to the {@link RemoteConnection#tx()}. + * <p> + * When a remote connection is present, this method delegates to the connection's + * {@link RemoteConnection#tx()} method, which returns an appropriate transaction + * implementation for the remote connection type (e.g., {@code HttpRemoteTransaction} + * for HTTP-based connections). + * + * @return A {@link Transaction} for managing transactional operations */ public Transaction tx() { if (null == this.connection) return this.graph.tx(); - else { - throw new UnsupportedOperationException("TinkerPop 4 does not yet support remote transactions"); - } - + else + return this.connection.tx(); } /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 95af4270f5..0a5f518ac3 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -18,8 +18,6 @@ */ package org.apache.tinkerpop.gremlin.driver; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -27,22 +25,10 @@ import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLException; -import java.net.ConnectException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.stream.Collectors; /** * A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server. @@ -53,7 +39,7 @@ import java.util.stream.Collectors; * * @author Stephen Mallette (http://stephen.genoprime.com) */ -public abstract class Client { +public abstract class Client implements RequestSubmitter, RequestSubmitterAsync { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static final String TOO_MANY_IN_FLIGHT_REQUESTS = "Number of active requests (%s) exceeds pool size (%s). " + @@ -62,8 +48,6 @@ public abstract class Client { protected final Cluster cluster; protected volatile boolean initialized; - private static final Random random = new Random(); - Client(final Cluster cluster) { this.cluster = cluster; } @@ -83,9 +67,10 @@ public abstract class Client { protected abstract void initializeImplementation(); /** - * Chooses a {@link Connection} to write the message to. + * Selects the {@link Host} to send the request to. Implementations define the selection strategy + * (load-balanced vs pinned). */ - protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException; + protected abstract Host selectHost(final RequestMessage msg); /** * Asynchronous close of the {@code Client}. @@ -239,6 +224,7 @@ public abstract class Client { options.getLanguage().ifPresent(lang -> request.addLanguage(lang)); options.getMaterializeProperties().ifPresent(mp -> request.addMaterializeProperties(mp)); options.getBulkResults().ifPresent(bulked -> request.addBulkResults(Boolean.parseBoolean(bulked))); + options.getTransactionId().ifPresent(transactionId -> request.addTransactionId(transactionId)); return submitAsync(request.create()); } @@ -255,9 +241,7 @@ public abstract class Client { final CompletableFuture<ResultSet> future = new CompletableFuture<>(); Connection connection = null; try { - // the connection is returned to the pool once the response has been completed...see Connection.write() - // the connection may be returned to the pool with the host being marked as "unavailable" - connection = chooseConnection(msg); + connection = cluster.borrowConnection(selectHost(msg)); connection.write(msg, future); return future; } catch (RuntimeException re) { @@ -292,9 +276,7 @@ public abstract class Client { */ public final static class ClusteredClient extends Client { - final ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); - private Throwable initializationFailure = null; ClusteredClient(final Cluster cluster) { super(cluster); @@ -344,137 +326,32 @@ public abstract class Client { } /** - * Uses a {@link LoadBalancingStrategy} to choose the best {@link Host} and then selects the best connection - * from that host's connection pool. + * Uses a {@link LoadBalancingStrategy} to choose the best {@link Host}. */ @Override - protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { + protected Host selectHost(final RequestMessage msg) { final Iterator<Host> possibleHosts = this.cluster.loadBalancingStrategy().select(msg); - - // try a random host if none are marked available. maybe it will reconnect in the meantime. better than - // going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier - final Host bestHost = possibleHosts.hasNext() ? possibleHosts.next() : chooseRandomHost(); - final ConnectionPool pool = hostConnectionPools.get(bestHost); - return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); - } - - private Host chooseRandomHost() { - final List<Host> hosts = new ArrayList<>(cluster.allHosts()); - final int ix = random.nextInt(hosts.size()); - return hosts.get(ix); + return possibleHosts.hasNext() ? possibleHosts.next() : cluster.randomHost(); } /** - * Initializes the connection pools on all hosts. + * No-op — connection pools are owned and closed by the {@link Cluster}. */ @Override protected void initializeImplementation() { - try { - CompletableFuture.allOf(cluster.allHosts().stream() - .map(host -> CompletableFuture.runAsync( - () -> initializeConnectionSetupForHost.accept(host), cluster.hostScheduler())) - .toArray(CompletableFuture[]::new)) - .join(); - } catch (CompletionException ex) { - logger.error("Initialization failed", ex); - this.initializationFailure = ex; - } - - // throw an error if there is no host available after initializing connection pool. we used - // to test cluster.availableHosts().isEmpty() but checking if we actually have hosts in - // the connection pool seems a bit more fireproof. if we look at initializeConnectionSetupForHost - // we can see that a successful initialization of the host/connection pool pair is followed by - // marking the host available and notifying the load balancing strategy. by relying directly on - // the state of hostConnectionPools we ensure that there is actually a concrete - // host/connection pool pair. even if the connection pool has immediate problems, it can fallback - // to its normal reconnection operation and won't put chooseConnection in a state where it can - // get a NPE if hostConnectionPools ends up being empty. it seems as if the safest minimum - // requirement for leaving this method is to ensure that at least one ConnectionPool constructor - // completed for at least one Host. - if (hostConnectionPools.isEmpty()) { - throwNoHostAvailableException(); - } - - // try to re-initiate any unavailable hosts in the background. - final List<Host> unavailableHosts = cluster.allHosts() - .stream().filter(host -> !host.isAvailable()).collect(Collectors.toList()); - if (!unavailableHosts.isEmpty()) { - handleUnavailableHosts(unavailableHosts); - } - } - - private void throwNoHostAvailableException() { - final Throwable rootCause = ExceptionUtils.getRootCause(initializationFailure); - // allow the certain exceptions to propagate as a cause - if (rootCause instanceof SSLException || rootCause instanceof ConnectException) { - throw new NoHostAvailableException(initializationFailure); - } else { - throw new NoHostAvailableException(); - } + // pools are initialized by Cluster.Manager.init() } /** - * Closes all the connection pools on all hosts. + * Marks this client as closed. Connection pools are managed by the {@link Cluster} and are not closed here. */ @Override public synchronized CompletableFuture<Void> closeAsync() { - if (closing.get() != null) - return closing.get(); - - final CompletableFuture<Void> allPoolsClosedFuture = - CompletableFuture.allOf(hostConnectionPools.values().stream() - .map(ConnectionPool::closeAsync) - .toArray(CompletableFuture[]::new)); - - closing.set(allPoolsClosedFuture); + if (closing.get() != null) return closing.get(); + closing.set(CompletableFuture.completedFuture(null)); return closing.get(); } - private final Consumer<Host> initializeConnectionSetupForHost = host -> { - try { - // hosts that don't initialize connection pools will come up as a dead host. - hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this)); - - // hosts are not marked as available at cluster initialization and are made available here instead. - host.makeAvailable(); - - // added a new host to the cluster so let the load-balancer know. - ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host); - } catch (RuntimeException ex) { - final String errMsg = "Could not initialize client for " + host; - logger.error(errMsg); - throw ex; - } - }; - - private void handleUnavailableHosts(final List<Host> unavailableHosts) { - // start the re-initialization attempt for each of the unavailable hosts through Host.makeUnavailable(). - for (Host host : unavailableHosts) { - final CompletableFuture<Void> f = CompletableFuture.runAsync( - () -> host.makeUnavailable(this::tryReInitializeHost), cluster.hostScheduler()); - f.exceptionally(t -> { - logger.error("", (t.getCause() == null) ? t : t.getCause()); - return null; - }); - } - } - - /** - * Attempt to re-initialize the {@link Host} that was previously marked as unavailable. This method gets called - * as part of a schedule in {@link Host} to periodically try to re-initialize. - */ - public boolean tryReInitializeHost(final Host host) { - logger.debug("Trying to re-initiate host connection pool on {}", host); - - try { - initializeConnectionSetupForHost.accept(host); - return true; - } catch (Exception ex) { - logger.debug("Failed re-initialization attempt on {}", host, ex); - return false; - } - } - } /** @@ -530,12 +407,12 @@ public abstract class Client { } /** - * Delegates to the underlying {@link Client.ClusteredClient}. + * Delegates host selection to the underlying {@link Client.ClusteredClient}. */ @Override - protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { + protected Host selectHost(final RequestMessage msg) { if (close.isDone()) throw new IllegalStateException("Client is closed"); - return client.chooseConnection(msg); + return client.selectHost(msg); } @Override @@ -561,5 +438,54 @@ public abstract class Client { if (close.isDone()) throw new IllegalStateException("Client is closed"); return new AliasClusteredClient(client, graphOrTraversalSource); } + } + + /** + * A {@link Client} that pins all requests to a single {@link Host}. Used internally by transactions + * to ensure all requests within a transaction go to the same server. + * <p> + * This client is not intended to be used directly — obtain a {@link org.apache.tinkerpop.gremlin.structure.Transaction} + * via {@link Cluster#transact()} or {@link Cluster#transact(String)} instead. + */ + public static class PinnedClient extends Client { + + private final Host pinnedHost; + private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); + + PinnedClient(final Cluster cluster, final Host pinnedHost) { + super(cluster); + this.pinnedHost = pinnedHost; + } + + public Host getPinnedHost() { + return pinnedHost; + } + + @Override + protected void initializeImplementation() { + initialized = true; // PinnedClient only borrows resources so it's technically always initialized + } + + @Override + protected Host selectHost(final RequestMessage msg) { + return pinnedHost; + } + + @Override + public boolean isClosing() { + return closing.get() != null; + } + + /** + * Marks this client as closed. The underlying pool is owned by {@link Cluster} and is not closed here. + */ + @Override + public synchronized CompletableFuture<Void> closeAsync() { + if (closing.get() != null) return closing.get(); + closing.set(CompletableFuture.completedFuture(null)); + return closing.get(); + } + } + } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index e080da8d51..329bf82111 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -31,7 +31,11 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.tuple.Pair; import org.apache.tinkerpop.gremlin.driver.auth.Auth; +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor; +import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction; +import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; @@ -46,7 +50,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.lang.ref.WeakReference; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; @@ -60,15 +63,20 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -81,6 +89,7 @@ import java.util.stream.Collectors; public final class Cluster { public static final String SERIALIZER_INTERCEPTOR_NAME = "serializer"; private static final Logger logger = LoggerFactory.getLogger(Cluster.class); + private static final Random random = new Random(); private final Manager manager; @@ -94,49 +103,42 @@ public final class Cluster { } /** - * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to - * a single server (randomly selected from the cluster), where the same bindings will be available on each request. - * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a - * single request. The transactions are managed by the user and must be committed or rolled-back manually. - * <p/> - * Note that calling this method does not imply that a connection is made to the server itself at this point. - * Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an - * error will not be raised at this point. Connections get initialized in the {@link Client} when a request is - * submitted or can be directly initialized via {@link Client#init()}. - * - * @param sessionId user supplied id for the session which should be unique (a UUID is ideal). + * Creates a new {@link Client} based on the settings provided. */ - public <T extends Client> T connect(final String sessionId) { - throw new UnsupportedOperationException("not implemented"); + public <T extends Client> T connect() { + final Client client = new Client.ClusteredClient(this); + return (T) client; } /** - * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to - * a single server (randomly selected from the cluster), where the same bindings will be available on each request. - * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a - * single request. If {@code manageTransactions} is set to {@code false} then transactions are managed by the - * user and must be committed or rolled-back manually. When set to {@code true} the transaction is committed or - * rolled-back at the end of each request. - * <p/> - * Note that calling this method does not imply that a connection is made to the server itself at this point. - * Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an - * error will not be raised at this point. Connections get initialized in the {@link Client} when a request is - * submitted or can be directly initialized via {@link Client#init()}. + * Creates a new {@link Transaction} using the server's default traversal source. + * The server will bind to "g" by default when no traversal source is specified. + */ + public HttpRemoteTransaction transact() { + return transact(null); + } + + /** + * Creates a new {@link Transaction} bound to the specified graph or traversal source. * - * @param sessionId user supplied id for the session which should be unique (a UUID is ideal). - * @param manageTransactions enables auto-transactions when set to true + * @param graphOrTraversalSource the graph/traversal source alias, or null to use the server default */ - public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) { - throw new UnsupportedOperationException("not implemented"); + public HttpRemoteTransaction transact(final String graphOrTraversalSource) { + init(); + final Host host = randomHost(); + final Client.PinnedClient pinnedClient = new Client.PinnedClient(this, host); + return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource); } /** - * Creates a new {@link Client} based on the settings provided. + * Selects a random host from all known hosts. + * @return the randomly chosen Host. */ - public <T extends Client> T connect() { - final Client client = new Client.ClusteredClient(this); - manager.trackClient(client); - return (T) client; + Host randomHost() { + final List<Host> all = new ArrayList<>(allHosts()); + if (all.isEmpty()) throw new NoHostAvailableException(); + + return all.get(random.nextInt(all.size())); } @Override @@ -365,6 +367,32 @@ public final class Cluster { return Collections.unmodifiableCollection(manager.allHosts()); } + /** + * Returns the {@link ConnectionPool} for the given {@link Host}, or null if not found. + */ + ConnectionPool getPoolFor(final Host host) { + return manager.getPoolFor(host); + } + + public void trackTransaction(final HttpRemoteTransaction tx) { + manager.trackTransaction(tx); + } + + public void untrackTransaction(final HttpRemoteTransaction tx) { + manager.untrackTransaction(tx); + } + + /** + * Borrows a {@link Connection} from the pool for the given {@link Host}. + * Throws {@link IllegalStateException} if the cluster is closing. + */ + Connection borrowConnection(final Host host) throws TimeoutException, ConnectionException { + if (isClosing()) throw new IllegalStateException("Cannot borrow a connection - cluster is closing"); + final ConnectionPool pool = manager.getPoolFor(host); + if (pool == null) throw new NoHostAvailableException(); + return pool.borrowConnection(connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); + } + Factory getFactory() { return manager.factory; } @@ -393,7 +421,7 @@ public final class Cluster { return manager.connectionPoolSettings; } - LoadBalancingStrategy loadBalancingStrategy() { + public LoadBalancingStrategy loadBalancingStrategy() { return manager.loadBalancingStrategy; } @@ -936,6 +964,8 @@ public final class Cluster { class Manager { private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<>(); + private final ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); + private final Set<HttpRemoteTransaction> openTransactions = ConcurrentHashMap.newKeySet(); private boolean initialized; private final List<InetSocketAddress> contactPoints; private final Factory factory; @@ -970,8 +1000,6 @@ public final class Cluster { private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>(); - private final List<WeakReference<Client>> openedClients = new ArrayList<>(); - private Manager(final Builder builder) { validateBuilder(builder); @@ -1075,10 +1103,72 @@ public final class Cluster { contactPoints.forEach(address -> { final Host host = add(address); }); + + // initialize connection pools for all known hosts + try { + CompletableFuture.allOf(hosts.values().stream() + .map(host -> CompletableFuture.runAsync( + () -> initializeConnectionSetupForHost(host), hostScheduler)) + .toArray(CompletableFuture[]::new)) + .join(); + } catch (CompletionException ex) { + logger.error("Cluster pool initialization failed", ex); + } + + if (hostConnectionPools.isEmpty()) { + throw new NoHostAvailableException(); + } + + final List<Host> unavailableHosts = hosts.values().stream() + .filter(host -> !host.isAvailable()).collect(Collectors.toList()); + if (!unavailableHosts.isEmpty()) { + handleUnavailableHosts(unavailableHosts); + } + } + + void initializeConnectionSetupForHost(final Host host) { + try { + hostConnectionPools.put(host, new ConnectionPool(host, Cluster.this)); + host.makeAvailable(); + loadBalancingStrategy.onNew(host); + } catch (RuntimeException ex) { + logger.error("Could not initialize connection pool for {}", host); + throw ex; + } } - void trackClient(final Client client) { - openedClients.add(new WeakReference<>(client)); + void handleUnavailableHosts(final List<Host> unavailableHosts) { + for (Host host : unavailableHosts) { + final CompletableFuture<Void> f = CompletableFuture.runAsync( + () -> host.makeUnavailable(this::tryReInitializeHost), hostScheduler); + f.exceptionally(t -> { + logger.error("", (t.getCause() == null) ? t : t.getCause()); + return null; + }); + } + } + + public boolean tryReInitializeHost(final Host host) { + logger.debug("Trying to re-initiate host connection pool on {}", host); + try { + initializeConnectionSetupForHost(host); + return true; + } catch (Exception ex) { + logger.debug("Failed re-initialization attempt on {}", host, ex); + return false; + } + } + + ConnectionPool getPoolFor(final Host host) { + return hostConnectionPools.get(host); + } + + void trackTransaction(final HttpRemoteTransaction tx) { + openTransactions.add(tx); + } + + void untrackTransaction(final HttpRemoteTransaction tx) { + openTransactions.remove(tx); } public Host add(final InetSocketAddress address) { @@ -1096,25 +1186,22 @@ public final class Cluster { if (closeFuture.get() != null) return closeFuture.get(); - final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size()); - for (WeakReference<Client> openedClient : openedClients) { - final Client client = openedClient.get(); - if (client != null) { - // best to call close() even if the Client is already closing so that we can be sure that - // any background client closing operations are included in this shutdown future - clientCloseFutures.add(client.closeAsync()); + // best-effort rollback of any open transactions before closing pools snapshot to avoid concurrent + // modification since rollback() calls untrackTransaction() + new ArrayList<>(openTransactions).forEach(tx -> { + try { + tx.rollback(); + } catch (Exception e) { + logger.warn("Failed to rollback transaction on cluster close", e); } - } + }); + openTransactions.clear(); - // when all the clients are fully closed then shutdown the netty event loop. not sure why this needs to - // block here, but if it doesn't then factory.shutdown() below doesn't seem to want to ever complete. - // ideally, this should all be async, but i guess it wasn't before this change so just going to leave it - // for now as this really isn't the focus on this change - CompletableFuture.allOf(clientCloseFutures.toArray(new CompletableFuture[0])).join(); + // close all connection pools owned by the cluster + CompletableFuture.allOf(hostConnectionPools.values().stream(). + map(ConnectionPool::closeAsync).toArray(CompletableFuture[]::new)).join(); final CompletableFuture<Void> closeIt = new CompletableFuture<>(); - // shutdown the event loop. that shutdown can trigger some final jobs to get scheduled so add a listener - // to the termination event to shutdown remaining thread pools factory.shutdown().awaitUninterruptibly().addListener(f -> { executor.shutdown(); hostScheduler.shutdown(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index cb94b39b87..17b1beef65 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -57,7 +57,6 @@ final class Connection { private final URI uri; private final AtomicReference<ResultSet> pending = new AtomicReference<>(); private final Cluster cluster; - private final Client client; private final ConnectionPool pool; private final String creatingThread; private final String createdTimestamp; @@ -81,7 +80,6 @@ final class Connection { public Connection(final URI uri, final ConnectionPool pool) throws ConnectionException { this.uri = uri; this.cluster = pool.getCluster(); - this.client = pool.getClient(); this.pool = pool; this.creatingThread = Thread.currentThread().getName(); this.createdTimestamp = Instant.now().toString(); @@ -90,9 +88,6 @@ final class Connection { if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection with the cluster after close() is called"); - if (client.isClosing()) - throw new IllegalStateException("Cannot open a connection with the client after close() is called"); - final Bootstrap b = this.cluster.getFactory().createBootstrap(); try { channelizer = new Channelizer.HttpChannelizer(); @@ -111,8 +106,6 @@ final class Connection { // if the closeFuture is not set, it means that closeAsync() wasn't called if (thisConnection.closeFuture.get() == null) { if (!channel.hasAttr(IdleConnectionHandler.IDLE_STATE_EVENT)) { - // if idle state event is not present, it means the server closed the channel for some reason. - // it's important to distinguish that difference in debugging logger.error(String.format( "Server closed the Connection on channel %s - scheduling removal from %s", channel.id().asShortText(), thisConnection.pool.getPoolInfo(thisConnection))); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index ee4c34c5aa..3581e1db1a 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -57,7 +57,6 @@ final class ConnectionPool { public final Host host; private final Cluster cluster; - private final Client client; private final List<Connection> connections; private final AtomicInteger open; private final Queue<Connection> availableConnections = new ConcurrentLinkedQueue<>(); @@ -90,18 +89,17 @@ final class ConnectionPool { public void setTimeNow() { timeOfConnectionAttempt = System.currentTimeMillis(); } } - public ConnectionPool(final Host host, final Client client) { - this(host, client, Optional.empty()); + public ConnectionPool(final Host host, final Cluster cluster) { + this(host, cluster, Optional.empty()); } - public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMaxPoolSize) { - this(host, client, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory()); + public ConnectionPool(final Host host, final Cluster cluster, final Optional<Integer> overrideMaxPoolSize) { + this(host, cluster, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory()); } - ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) { + ConnectionPool(final Host host, final Cluster cluster, final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) { this.host = host; - this.client = client; - this.cluster = client.cluster; + this.cluster = cluster; this.connectionFactory = connectionFactory; poolLabel = "Connection Pool {host=" + host + "}"; @@ -207,10 +205,6 @@ final class ConnectionPool { } } - Client getClient() { - return client; - } - Cluster getCluster() { return cluster; } @@ -415,7 +409,8 @@ final class ConnectionPool { // Assumes that the root cause will give better information about why the connection failed. cause.append(ExceptionHelper.getRootCause(res.getFailureCause()).getMessage()); } else if (open.get() >= maxPoolSize) { - cause.append(String.format(Client.TOO_MANY_IN_FLIGHT_REQUESTS, open.get(), maxPoolSize)); + cause.append(String.format("Number of active requests (%s) exceeds pool size (%s). " + + "Consider increasing the value for maxConnectionPoolSize.", open.get(), maxPoolSize)); } else { cause.setLength(0); } @@ -479,7 +474,7 @@ final class ConnectionPool { // pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this // error state connection = connectionFactory.create(this); - final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create(); + final RequestMessage ping = cluster.validationRequest().create(); final CompletableFuture<ResultSet> f = new CompletableFuture<>(); connection.write(ping, f); f.get().all().get(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java index a43e186164..c51f2b8516 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java @@ -50,6 +50,7 @@ public final class RequestOptions { private final String language; private final String materializeProperties; private final String bulkResults; + private final String transactionId; private RequestOptions(final Builder builder) { this.graphOrTraversalSource = builder.graphOrTraversalSource; @@ -59,6 +60,7 @@ public final class RequestOptions { this.language = builder.language; this.materializeProperties = builder.materializeProperties; this.bulkResults = builder.bulkResults; + this.transactionId = builder.transactionId; } public Optional<String> getG() { @@ -85,6 +87,8 @@ public final class RequestOptions { public Optional<String> getBulkResults() { return Optional.ofNullable(bulkResults); } + public Optional<String> getTransactionId() { return Optional.ofNullable(transactionId); } + public static Builder build() { return new Builder(); } @@ -125,6 +129,25 @@ public final class RequestOptions { private String materializeProperties = null; private String language = null; private String bulkResults = null; + private String transactionId = null; + + /** + * Creates a {@link Builder} populated with the values from the provided {@link RequestOptions}. + * @param options the options to copy from + * @return a {@link Builder} with the copied options + */ + public static Builder from(final RequestOptions options) { + final Builder builder = build(); + builder.graphOrTraversalSource = options.graphOrTraversalSource; + builder.parameters = options.parameters; + builder.batchSize = options.batchSize; + builder.timeout = options.timeout; + builder.materializeProperties = options.materializeProperties; + builder.language = options.language; + builder.bulkResults = options.bulkResults; + builder.transactionId = options.transactionId; + return builder; + } /** * The aliases to set on the request. @@ -196,6 +219,14 @@ public final class RequestOptions { return this; } + /** + * Sets the transactionId value to be sent on the request. + */ + public Builder transactionId(final String id) { + this.transactionId = id; + return this; + } + public RequestOptions create() { return new RequestOptions(this); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java new file mode 100644 index 0000000000..e5fd74ad98 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import java.util.Map; + +/** + * Defines the synchronous request submission contract for Gremlin requests. + * <p> + * This interface is implemented by both {@link Client} and transaction classes + * to ensure a consistent API for submitting Gremlin scripts. The synchronous + * nature of these methods means they block until the request completes. + * <p> + * For asynchronous submission, see {@link RequestSubmitterAsync}. + * + * @author Apache TinkerPop + */ +public interface RequestSubmitter { + + /** + * Submits a Gremlin script and blocks until the response is received. + * + * @param gremlin the Gremlin script to execute + * @return the results of the script execution + */ + ResultSet submit(String gremlin); + + /** + * Submits a Gremlin script with bound parameters and blocks until the response is received. + * <p> + * Prefer this method over string concatenation when executing scripts with variable + * arguments, as parameterized scripts perform better. + * + * @param gremlin the Gremlin script to execute + * @param parameters a map of parameters that will be bound to the script on execution + * @return the results of the script execution + */ + ResultSet submit(String gremlin, Map<String, Object> parameters); + + /** + * Submits a Gremlin script with request options and blocks until the response is received. + * + * @param gremlin the Gremlin script to execute + * @param options the options to supply for this request + * @return the results of the script execution + */ + ResultSet submit(String gremlin, RequestOptions options); +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java new file mode 100644 index 0000000000..eb700f881f --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Defines the asynchronous request submission contract for Gremlin requests. + * <p> + * This interface is implemented by {@link Client} to provide non-blocking + * request submission. The returned {@link CompletableFuture} completes when + * the write of the request is complete, not when the response is received. + * <p> + * Note: Transaction classes intentionally do not implement this interface + * because transactional operations require sequential execution to maintain + * ordering guarantees over HTTP. + * <p> + * For synchronous submission, see {@link RequestSubmitter}. + * + * @author Apache TinkerPop + */ +public interface RequestSubmitterAsync { + + /** + * Submits a Gremlin script asynchronously. + * <p> + * The returned future completes when the write of the request is complete. + * + * @param gremlin the Gremlin script to execute + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin); + + /** + * Submits a Gremlin script with bound parameters asynchronously. + * <p> + * Prefer this method over string concatenation when executing scripts with variable + * arguments, as parameterized scripts perform better. + * + * @param gremlin the Gremlin script to execute + * @param parameters a map of parameters that will be bound to the script on execution + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin, Map<String, Object> parameters); + + /** + * Submits a Gremlin script with request options asynchronously. + * + * @param gremlin the Gremlin script to execute + * @param options the options to supply for this request + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin, RequestOptions options); +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java index c94d74264d..2b45be91ce 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -91,6 +91,13 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req if (bulkResults) { headersMap.put(Tokens.BULK_RESULTS, "true"); } + + // Add X-Transaction-Id header to comply with specification's dual transmission (header and body) + final String transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID); + if (transactionId != null) { + headersMap.put("X-Transaction-Id", transactionId); + } + HttpRequest gremlinRequest = new HttpRequest(headersMap, requestMessage, uri); for (final Pair<String, ? extends RequestInterceptor> interceptor : interceptors) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index c8d56814c4..c909e48604 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -248,15 +248,14 @@ public class DriverRemoteConnection implements RemoteConnection { } /** - * Constructs a new {@link DriverRemoteTransaction}. Not yet supported in TinkerPop 4. + * Creates a new {@link HttpRemoteTransaction} for executing transactional operations. + * + * @return A new {@link HttpRemoteTransaction} */ -// @Override -// public Transaction tx() { -// // todo: not implemented -// final DriverRemoteConnection session = new DriverRemoteConnection( -// client.getCluster().connect(), remoteTraversalSourceName, true); -// return new DriverRemoteTransaction(session); -// } + @Override + public Transaction tx() { + return client.getCluster().transact(remoteTraversalSourceName); + } @Override public String toString() { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java new file mode 100644 index 0000000000..0dccb4cb1b --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.remote; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Host; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.RequestSubmitter; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.util.TransactionException; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * A {@link Transaction} implementation for HTTP-based remote connections. + * <p> + * This class provides synchronous, sequential request execution within a transaction context. + * All requests are pinned to a single host and include the transaction ID (after begin()). + * <p> + * Key characteristics: + * <ul> + * <li>Synchronous API only - no submitAsync() methods</li> + * <li>Host pinning - all requests go to the same server</li> + * <li>Sequential execution - requests block until complete</li> + * </ul> + * <p> + * Usage: + * <pre> + * Transaction tx = cluster.transact("g"); + * GraphTraversalSource gtx = tx.begin(); + * gtx.addV("person").property("name", "alice").iterate(); + * tx.commit(); + * </pre> + * + * This class is <b>NOT</b> thread-safe. + */ +public class HttpRemoteTransaction implements Transaction, RequestSubmitter { + private static final Logger logger = LoggerFactory.getLogger(HttpRemoteTransaction.class); + private static final long CLOSING_MAX_WAIT_MS = 100000; // TODO: revert back to 10k + + protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT; + private final Client.PinnedClient pinnedClient; + private final Cluster cluster; + private final Host pinnedHost; + private final String graphAlias; + private String transactionId; // null until begin(), set from server response + private TransactionState state = TransactionState.NOT_STARTED; + + private enum TransactionState { + NOT_STARTED, OPEN, CLOSED + } + + /** + * Creates a new HTTP transaction. + * <p> + * The transaction is not started until {@link #begin(Class)} is called. + * A host is selected at creation time and all requests will be pinned to it. + * + * @param pinnedClient the underlying client for connection access + * @param graphAlias the graph/traversal source alias (e.g., "g") + * @throws NoHostAvailableException if no hosts are available in the cluster + */ + public HttpRemoteTransaction(final Client.PinnedClient pinnedClient, final String graphAlias) { + this.pinnedClient = pinnedClient; + this.graphAlias = graphAlias; + this.pinnedHost = pinnedClient.getPinnedHost(); + this.cluster = pinnedClient.getCluster(); + } + + /** + * Not supported for remote transactions. Use {@link #begin(Class)} instead. + * + * @throws UnsupportedOperationException always + */ + @Override + public void open() { + begin(); + } + + /** + * Starts a transaction and returns a traversal source bound to it. + * <p> + * This method sends {@code g.tx().begin()} to the server, which returns + * the transaction ID. All subsequent requests will include this ID. + * + * @param traversalSourceClass the class of the traversal source to create + * @param <T> the type of the traversal source + * @return a new traversal source bound to this transaction + * @throws IllegalStateException if the transaction is already started + * @throws RuntimeException if the transaction fails to begin + */ + @Override + public <T extends TraversalSource> T begin(final Class<T> traversalSourceClass) { + if (state != TransactionState.NOT_STARTED) { + throw new IllegalStateException("Transaction already started"); + } + cluster.trackTransaction(this); + + try { + // Send begin - no txId attached yet + final ResultSet rs = submitInternal("g.tx().begin()"); + + // Server returns the transaction ID + this.transactionId = extractTransactionId(rs); + this.state = TransactionState.OPEN; + } catch (Exception e) { + cleanUp(); + throw new RuntimeException("Failed to begin transaction: " + e.getMessage(), e); + } + + // Create RemoteConnection for the traversal source + final TransactionRemoteConnection txConnection = new TransactionRemoteConnection(this); + + try { + return traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(txConnection); + } catch (Exception e) { + rollback(); + throw new IllegalStateException("Failed to create TraversalSource", e); + } + } + + /** + * Extracts the transaction ID from the begin() response. + * <p> + * The server returns the transaction ID as part of the response to g.tx().begin(). + * + * @param rs the result set from the begin request + * @return the transaction ID + */ + private String extractTransactionId(final ResultSet rs) { + // Wait for all results and extract the transaction ID + final List<Result> results = rs.all().join(); + if (results.isEmpty()) { + throw new IllegalStateException("Server did not return transaction ID"); + } + // The transaction ID is returned as the result of g.tx().begin() + final Object id = results.get(0).get(Map.class).get("transactionId"); + if (id == null) throw new IllegalStateException("Server did not return transaction ID"); + + final String idStr = id.toString(); + if (idStr.isBlank()) throw new IllegalStateException("Server returned empty transaction ID"); + + return idStr; + } + + /** + * Commits the transaction. + * <p> + * Sends {@code g.tx().commit()} to the server and closes the transaction. + * + * @throws IllegalStateException if the transaction is not open + * @throws RuntimeException if the commit fails + */ + @Override + public void commit() { + closeRemoteTransaction("g.tx().commit()"); + } + + /** + * Rolls back the transaction. + * <p> + * Sends {@code g.tx().rollback()} to the server and closes the transaction. + * This is best-effort - errors are logged but not thrown. + */ + @Override + public void rollback() { + closeRemoteTransaction("g.tx().rollback()"); + } + + private void closeRemoteTransaction(final String closeScript) { + if (state != TransactionState.OPEN) throw new IllegalStateException("Transaction is not open"); + + try { + submitInternal(closeScript).all().get(CLOSING_MAX_WAIT_MS, TimeUnit.MILLISECONDS); + cleanUp(); + } catch (Exception e) { + logger.warn("Failed to {} transaction on {}", closeScript, pinnedHost); + throw new TransactionException("Failed to " + closeScript, e); + } + } + + private void cleanUp() { + state = TransactionState.CLOSED; + cluster.untrackTransaction(this); + } + + /** + * Returns the server-generated transaction ID, or {@code null} if the transaction + * has not yet been started via {@link #begin(Class)}. + * + * @return the transaction ID, or null if not yet begun + */ + public String getTransactionId() { + return transactionId; + } + + @Override + public boolean isOpen() { + return state == TransactionState.OPEN; + } + + @Override + public void readWrite() { + throw new UnsupportedOperationException("Remote transaction behaviors are not configurable - they are always manually controlled"); + } + + @Override + public void close() { + closeConsumer.accept(this); + + // this is just for safety in case of custom closeConsumer but should normally be handled by commit/rollback + cleanUp(); + } + + @Override + public Transaction onReadWrite(final Consumer<Transaction> consumer) { + throw new UnsupportedOperationException("Remote transaction behaviors are not configurable - they are always manually controlled"); + } + + @Override + public Transaction onClose(final Consumer<Transaction> consumer) { + this.closeConsumer = consumer; + return this; + } + + @Override + public void addTransactionListener(final Consumer<Status> listener) { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public void removeTransactionListener(final Consumer<Status> listener) { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public void clearTransactionListeners() { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public ResultSet submit(final String gremlin) { + return submit(gremlin, RequestOptions.EMPTY); + } + + @Override + public ResultSet submit(final String gremlin, final Map<String, Object> parameters) { + final RequestOptions.Builder builder = RequestOptions.build(); + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(builder::addParameter); + } + return submit(gremlin, builder.create()); + } + + @Override + public ResultSet submit(final String gremlin, final RequestOptions options) { + if (state != TransactionState.OPEN) { + throw new IllegalStateException("Transaction is not open"); + } + return submitInternal(gremlin, options); + } + + private ResultSet submitInternal(final String gremlin) { + return submitInternal(gremlin, RequestOptions.EMPTY); + } + + // synchronized here is a bit defensive but ensures that even if a user accidentally uses this in different threads, + // the server will still receive the requests in the correct order + private synchronized ResultSet submitInternal(final String gremlin, final RequestOptions options) { + final RequestOptions.Builder builder = RequestOptions.Builder.from(options); + if (graphAlias != null) { + // Don't allow per-request override of "g" as transactions should only target a single Graph instance. + builder.addG(graphAlias); + } + + // Attach txId if we have one (not present for begin()) + if (transactionId != null) { + builder.transactionId(transactionId); + } + + try { + return pinnedClient.submit(gremlin, builder.create()); + } catch (Exception e) { + throw new RuntimeException("Transaction request failed: " + e.getMessage(), e); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java new file mode 100644 index 0000000000..0f9e903dd8 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.remote; + +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.GremlinLang; +import org.apache.tinkerpop.gremlin.structure.Transaction; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.apache.tinkerpop.gremlin.driver.RequestOptions.getRequestOptions; + +/** + * A {@link RemoteConnection} that routes all submissions through an {@link HttpRemoteTransaction}. + * <p> + * This connection adapts the synchronous transaction API to the async {@link RemoteConnection} + * interface required by {@link org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource}. + * <p> + * Key characteristics: + * <ul> + * <li>Submissions are synchronous internally (via {@link HttpRemoteTransaction#submit})</li> + * <li>Returns completed futures to satisfy the async interface</li> + * <li>Transaction ID is automatically attached by the transaction</li> + * </ul> + */ +class TransactionRemoteConnection implements RemoteConnection { + + private final HttpRemoteTransaction transaction; + + /** + * Creates a new connection bound to the specified transaction. + * + * @param transaction the transaction that owns this connection + */ + TransactionRemoteConnection(final HttpRemoteTransaction transaction) { + this.transaction = transaction; + } + + /** + * Submits a traversal through the transaction. + * <p> + * The submission is synchronous internally but returns a completed future + * to satisfy the {@link RemoteConnection} interface. + * + * @param gremlinLang the traversal to submit + * @param <E> the type of elements returned by the traversal + * @return a completed future with the traversal results + * @throws RemoteConnectionException if the transaction is not open or submission fails + */ + @Override + public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final GremlinLang gremlinLang) + throws RemoteConnectionException { + if (!transaction.isOpen()) { + throw new RemoteConnectionException("Transaction is not open"); + } + + try { + // Synchronous submission through transaction + final ResultSet rs = transaction.submit(gremlinLang.getGremlin(), getRequestOptions(gremlinLang)); + + final RemoteTraversal<?, E> traversal = new DriverRemoteTraversal<>(rs, + null, // client not needed for iteration + false, // attachElements + Optional.empty()); + + return CompletableFuture.completedFuture(traversal); + } catch (Exception e) { + throw new RemoteConnectionException(e); + } + } + + /** + * Returns the owning transaction. + * + * @return the transaction that owns this connection + */ + @Override + public Transaction tx() { + return transaction; + } + + /** + * No-op close implementation. + * <p> + * The transaction manages its own lifecycle - users should call + * {@link Transaction#commit()} or {@link Transaction#rollback()} explicitly. + */ + @Override + public void close() { + // Transaction manages its own lifecycle - don't close it here + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java new file mode 100644 index 0000000000..23443c1b3e --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link Client} subclasses verifying the refactored host selection + * and connection pool ownership model. + */ +public class ClientTest { + + /** + * ClusteredClient.selectHost() should return the host from the load balancing strategy. + */ + @Test + public void shouldSelectHostViaLoadBalancingStrategy() { + final Host expectedHost = mock(Host.class); + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + + final Client.ClusteredClient client = new Client.ClusteredClient(cluster); + final RequestMessage msg = RequestMessage.build("g.V()").create(); + + final Host selected = client.selectHost(msg); + assertSame(expectedHost, selected); + } + + /** + * When the load balancing strategy returns an empty iterator, ClusteredClient.selectHost() + * should fall back to Cluster.randomHost(). + */ + @Test + public void shouldFallbackToRandomHostWhenStrategyReturnsEmpty() { + final Host fallbackHost = mock(Host.class); + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(Collections.emptyIterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + when(cluster.randomHost()).thenReturn(fallbackHost); + + final Client.ClusteredClient client = new Client.ClusteredClient(cluster); + final RequestMessage msg = RequestMessage.build("g.V()").create(); + + final Host selected = client.selectHost(msg); + assertSame(fallbackHost, selected); + } + + /** + * PinnedClient.selectHost() should always return the pinned host regardless of the message. + */ + @Test + public void shouldAlwaysReturnPinnedHost() { + final Host pinnedHost = mock(Host.class); + final Cluster cluster = mock(Cluster.class); + + final Client.PinnedClient client = new Client.PinnedClient(cluster, pinnedHost); + + final RequestMessage msg1 = RequestMessage.build("g.V()").create(); + final RequestMessage msg2 = RequestMessage.build("g.E()").create(); + + assertSame(pinnedHost, client.selectHost(msg1)); + assertSame(pinnedHost, client.selectHost(msg2)); + assertSame(pinnedHost, client.selectHost(null)); + } + + /** + * AliasClusteredClient.selectHost() should delegate to the underlying client's selectHost(). + */ + @Test + public void shouldDelegateSelectHostInAliasClient() { + final Host expectedHost = mock(Host.class); + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + + final Client.ClusteredClient underlying = new Client.ClusteredClient(cluster); + final Client.AliasClusteredClient aliasClient = new Client.AliasClusteredClient(underlying, "g"); + + final RequestMessage msg = RequestMessage.build("g.V()").create(); + final Host selected = aliasClient.selectHost(msg); + assertSame(expectedHost, selected); + } + + /** + * ClusteredClient.closeAsync() should return a completed future without closing any pools. + */ + @Test + public void shouldNotClosePoolsOnClusteredClientClose() { + final Cluster cluster = mock(Cluster.class); + final Client.ClusteredClient client = new Client.ClusteredClient(cluster); + + assertFalse(client.isClosing()); + + final java.util.concurrent.CompletableFuture<Void> future = client.closeAsync(); + assertNotNull(future); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + assertTrue(client.isClosing()); + + // calling again should return the same future + assertSame(future, client.closeAsync()); + } + + /** + * PinnedClient.closeAsync() should return a completed future without closing any pools. + */ + @Test + public void shouldNotClosePoolsOnPinnedClientClose() { + final Host host = mock(Host.class); + final Cluster cluster = mock(Cluster.class); + final Client.PinnedClient client = new Client.PinnedClient(cluster, host); + + assertFalse(client.isClosing()); + + final java.util.concurrent.CompletableFuture<Void> future = client.closeAsync(); + assertNotNull(future); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + assertTrue(client.isClosing()); + + // calling again should return the same future + assertSame(future, client.closeAsync()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java new file mode 100644 index 0000000000..c35116f020 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -0,0 +1,1059 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import org.apache.tinkerpop.gremlin.server.handler.TransactionManager; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.util.ExceptionHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Integration tests for HTTP remote transactions using the driver API ({@code Cluster.transact()}). + * <p> + * Tests exercise {@link HttpRemoteTransaction} directly via its {@code submit()} methods + * rather than through {@link GraphTraversalSource}, which has its own unit tests. + * <p> + * Tests are numbered according to the HTTP Remote Transaction Test Plan. + * Server-side verification tests (raw HTTP) are in {@link GremlinServerHttpTransactionIntegrateTest}. + */ +public class GremlinDriverTransactionIntegrateTest extends AbstractGremlinServerIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(GremlinDriverTransactionIntegrateTest.class); + + private static final String GTX = "gtx"; + + private Cluster cluster; + + @Before + public void openCluster() { + cluster = TestClientFactory.open(); + } + + @After + public void closeCluster() throws Exception { + if (cluster != null) cluster.close(); + } + + /** + * Polls the given condition until it returns {@code true} or the timeout expires. + * Preferred over {@code Thread.sleep()} for timeout-dependent assertions to avoid + * flakiness in slower CI environments. + */ + private static void awaitCondition(final long timeoutMs, final long pollMs, + final Callable<Boolean> condition) throws Exception { + final long deadline = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadline) { + if (condition.call()) return; + Thread.sleep(pollMs); + } + throw new AssertionError("Condition not met within " + timeoutMs + "ms"); + } + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + final String nameOfTest = name.getMethodName(); + switch (nameOfTest) { + case "shouldEnforceMaxConcurrentTransactions_22": + settings.maxConcurrentTransactions = 1; + break; + case "shouldTimeoutIdleTransaction_23": + case "shouldTimeoutIdleTransactionWithNoOperations_30": + settings.transactionTimeout = 1000; + break; + case "shouldTimeoutOnlyIdleTransactionNotActiveOne_31": + settings.transactionTimeout = 2000; + break; + case "shouldRejectLateCommitAfterTimeout_32": + settings.transactionTimeout = 1000; + break; + case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34": + settings.maxConcurrentTransactions = 1; + settings.transactionTimeout = 1000; + break; + case "shouldCommitBeforeTimeoutWins_37": + settings.transactionTimeout = 3000; + break; + case "shouldTimeoutAbandonedTransaction_39": + settings.transactionTimeout = 1000; + break; + case "shouldTrackTransactionCountAccurately_33": + settings.transactionTimeout = 2000; + break; + case "shouldRollbackAllOnServerShutdown_35": + break; + case "shouldShutdownWithinBoundedTime_36": + settings.perGraphCloseTimeout = 2000; + break; + } + return settings; + } + + /** + * Begin a transaction, add a vertex, verify isolation, commit, verify isOpen transitions, and verify data persists + * after commit. + */ + @Test + public void shouldCommitTransaction_1_4_6_7() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + // #4: isOpen true after begin + assertTrue(tx.isOpen()); + + tx.submit("g.addV('person').property('name','alice')"); + + // #6: uncommitted data not visible outside the transaction + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + tx.commit(); + // #4: isOpen false after commit + assertFalse(tx.isOpen()); + // #1, #7: committed data visible to non-transactional reads + assertEquals(1L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + /** + * Combined test for #2, #5: Begin a transaction, add a vertex, rollback, + * verify isOpen is false and data is discarded. + */ + @Test + public void shouldRollbackTransaction_2_5() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + assertTrue(tx.isOpen()); + + tx.submit("g.addV('person').property('name','bob')"); + + tx.rollback(); + // #5: isOpen false after rollback + assertFalse(tx.isOpen()); + // #2: data discarded after rollback + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + // ========================================================================= + // B. Intra-Transaction Consistency — #3, #8, #9 combined + // ========================================================================= + + /** + * Combined test for #3, #8, #9: Begin a transaction, add vertices and edges as separate + * requests, verify read-your-own-writes within the transaction, commit, verify persistence. + */ + @Test + public void shouldSupportIntraTransactionConsistency_3_8_9() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('test').property('name','A')"); + // #8: read-your-own-writes — vertex A visible within the transaction + assertEquals(1L, tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + + tx.submit("g.addV('test').property('name','B')"); + tx.submit("g.V().has('name','A').addE('knows').to(V().has('name','B'))"); + + // verify the full subgraph is visible within the transaction before commit + assertEquals(2L, tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + assertEquals(1L, tx.submit("g.V().outE('knows').count()").all().get().get(0).getLong()); + + tx.commit(); + + // #3: vertices and edges persist after commit + assertEquals(2L, client.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + assertEquals(1L, client.submit("g.E().hasLabel('knows').count()").all().get().get(0).getLong()); + } + + /** + * #10: Submit on a committed transaction throws IllegalStateException. + */ + @Test + public void shouldThrowOnSubmitAfterCommit_10() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV()"); + tx.commit(); + + try { + tx.submit("g.V().count()"); + fail("Expected exception on submit after commit"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + /** + * #11: Submit on a rolled-back transaction throws IllegalStateException. + */ + @Test + public void shouldThrowOnSubmitAfterRollback_11() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV()"); + tx.rollback(); + + try { + tx.submit("g.V().count()"); + fail("Expected exception on submit after rollback"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + /** + * #12: Calling begin() twice on the same HttpRemoteTransaction throws IllegalStateException. + */ + @Test + public void shouldThrowOnDoubleBegin_12() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + + try { + tx.begin(); + fail("Expected IllegalStateException on second begin()"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction already started")); + } + } + + /** + * #13: Calling commit() when not open throws IllegalStateException. + */ + @Test + public void shouldThrowOnCommitWhenNotOpen_13() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + assertFalse(tx.isOpen()); + + try { + tx.commit(); + fail("Expected IllegalStateException on commit when not open"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + /** + * #14: Calling rollback() when not open throws IllegalStateException. + */ + @Test + public void shouldThrowOnRollbackWhenNotOpen_14() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + assertFalse(tx.isOpen()); + + try { + tx.rollback(); + fail("Expected IllegalStateException on rollback when not open"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + /** + * #15: getTransactionId() returns null on a freshly created, not-yet-begun transaction, + * and returns a non-null, non-blank value after begin(). + */ + @Test + public void shouldReturnNullTransactionIdBeforeBegin_15() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + + // before begin, transactionId should be null + assertNull(tx.getTransactionId()); + + tx.begin(); + // after begin, transactionId should be non-null and non-blank + assertNotNull(tx.getTransactionId()); + assertFalse(tx.getTransactionId().isBlank()); + } + + /** + * #16: Transaction.close() triggers the closeConsumer (default: commit). + */ + @Test + public void shouldCommitOnCloseByDefault_16() throws Exception { + final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); + tx1.begin(); + tx1.submit("g.addV('person').property('name','close_test')"); + // close() should trigger default COMMIT behavior + tx1.close(); + assertFalse(tx1.isOpen()); + + final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); + tx2.begin(); + assertEquals(1L, tx2.submit("g.V().hasLabel('person').count()").one().getLong()); + } + + /** + * #17: Transaction.onClose(ROLLBACK) then close() triggers rollback instead. + */ + @Test + public void shouldRollbackOnCloseWhenConfigured_17() throws Exception { + final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); + tx1.onClose(Transaction.CLOSE_BEHAVIOR.ROLLBACK); + tx1.begin(); + tx1.submit("g.addV('person').property('name','rollback_close_test')"); + tx1.close(); + assertFalse(tx1.isOpen()); + + final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); + tx2.begin(); + assertEquals(0L, tx2.submit("g.V().hasLabel('person').count()").one().getLong()); + } + + /** + * #18: Cluster.close() rolls back all open transactions (best-effort). + */ + @Test + public void shouldRollbackOpenTransactionsOnClusterClose_18() throws Exception { + final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); + tx1.begin(); + tx1.submit("g.addV('cluster-close')"); + tx1.submit("g.addV('cluster-close')"); + + final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); + tx2.begin(); + tx2.submit("g.addV('cluster-close')"); + + // close cluster without committing — should rollback open transactions + cluster.close(); + cluster = null; + + // reconnect and verify data was not persisted + final Cluster cluster2 = TestClientFactory.open(); + try { + final HttpRemoteTransaction cluster2tx1 = cluster2.transact(GTX); + assertEquals(0L, cluster2tx1.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong()); + } finally { + cluster2.close(); + } + } + + /** + * #22: Server enforces maxConcurrentTransactions — returns error when limit exceeded. + * Configured with maxConcurrentTransactions=1. + */ + @Test + public void shouldEnforceMaxConcurrentTransactions_22() throws Exception { + // first transaction fills the single slot + final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); + tx1.begin(); + + // second transaction should fail + try { + final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); + tx2.begin(); + fail("Expected exception when max concurrent transactions exceeded"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Maximum concurrent transactions exceeded")); + } + } + + /** + * #23: Server-side transaction timeout — idle transaction auto-rolls back after configured timeout. + * Configured with transactionTimeout=1000. + */ + @Test + public void shouldTimeoutIdleTransaction_23() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('timeout_test')"); + + // wait for the transaction to timeout + Thread.sleep(2000); + + // the transaction was rolled back server-side; attempting to commit should fail + try { + tx.commit(); + fail("Expected exception on commit after server-side timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('timeout_test').count()").all().get().get(0).getLong()); + } + + /** + * #30: Idle transaction with no operations times out. + * Configured with transactionTimeout=1000. + */ + @Test + public void shouldTimeoutIdleTransactionWithNoOperations_30() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + + // wait for the transaction to timeout + Thread.sleep(2000); + + // attempting to commit should fail because the server rolled back + try { + tx.commit(); + fail("Expected exception on commit after server-side timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + } + + /** + * #31: Concurrent transaction timeout independence — keep one active, leave the other idle. + * Only the idle one should timeout. Configured with transactionTimeout=2000. + */ + @Test + public void shouldTimeoutOnlyIdleTransactionNotActiveOne_31() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction txActive = (HttpRemoteTransaction) cluster.transact(GTX); + txActive.begin(); + txActive.submit("g.addV('active')"); + + final HttpRemoteTransaction txIdle = (HttpRemoteTransaction) cluster.transact(GTX); + txIdle.begin(); + txIdle.submit("g.addV('idle')"); + + // keep the active transaction alive by sending requests at intervals shorter than timeout + for (int i = 0; i < 3; i++) { + Thread.sleep(800); + txActive.submit("g.V().count()"); + } + + // by now the idle transaction should have timed out (2000ms elapsed) + // the active transaction should still be alive + txActive.commit(); + + // verify active transaction's data persisted + assertEquals(1L, client.submit("g.V().hasLabel('active').count()").all().get().get(0).getLong()); + + // idle transaction should have been rolled back by timeout + try { + txIdle.commit(); + fail("Expected exception on commit of timed-out idle transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify idle transaction's data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('idle').count()").all().get().get(0).getLong()); + } + + /** + * #32: Late commit after timeout — timeout fires and rolls back, then client sends commit. + * Configured with transactionTimeout=1000. + */ + @Test + public void shouldRejectLateCommitAfterTimeout_32() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('person').property('name','late_commit')"); + + // wait for timeout + Thread.sleep(2000); + + // attempt commit — should fail because server already rolled back + try { + tx.commit(); + fail("Expected exception on late commit after timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong()); + } + + /** + * #34: Timeout frees a slot under maxConcurrentTransactions. + * Configured with maxConcurrentTransactions=1 and transactionTimeout=1000. + */ + // TODO: get rid of this? it doesn't even have a driver side assert +// @Test +// public void shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34() throws Exception { +// // fill the single slot +// final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx1.begin(); +// +// // wait for timeout to reclaim the slot +// Thread.sleep(2000); +// +// // now a new transaction should succeed +// final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx2.begin(); +// assertTrue(tx2.isOpen()); +// tx2.rollback(); +// } + + /** + * #37: Commit queued before timeout wins — set a timeout, begin tx, add data, + * and immediately commit. The commit should succeed if it arrives before the timeout fires. + * Configured with transactionTimeout=3000 to avoid flakiness. + */ + // TODO: this just seems like a bad way to test this, consider removing or improving +// @Test +// public void shouldCommitBeforeTimeoutWins_37() throws Exception { +// final Client client = cluster.connect(); +// final RequestOptions gtxOptions = RequestOptions.build().addG(GTX).create(); +// +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); +// tx.begin(); +// +// tx.submit("g.addV('person').property('name','fast_commit')"); +// +// // commit immediately — should succeed before timeout fires +// tx.commit(); +// +// assertFalse(tx.isOpen()); +// assertEquals(1L, client.submit("g.V().hasLabel('person').count()", gtxOptions) +// .all().get().get(0).getLong()); +// } + + /** + * #39: Client disappears without commit or rollback — server-side transaction eventually + * times out and rolls back. Configured with transactionTimeout=1000. + */ + // TODO: consider transfering this to "server test" +// @Test +// public void shouldTimeoutAbandonedTransaction_39() throws Exception { +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); +// tx.begin(); +// tx.submit("g.addV('person').property('name','abandoned')"); +// +// // close cluster without committing +// cluster.close(); +// cluster = null; +// +// // wait for server-side timeout +// Thread.sleep(2000); +// +// // reconnect and verify data was not persisted +// final Cluster cluster2 = TestClientFactory.open(); +// try { +// final Client client2 = cluster2.connect(); +// assertEquals(0L, client2.submit("g.V().hasLabel('person').count()", +// RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong()); +// } finally { +// cluster2.close(); +// } +// } + + /** + * #48: Multiple concurrent transactions (different transaction IDs) don't interfere. + */ + @Test + public void shouldIsolateConcurrentTransactions_48() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); + tx1.begin(); + final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); + tx2.begin(); + + tx1.submit("g.addV('tx1')"); + tx2.submit("g.addV('tx2')"); + + // tx1 should not see tx2's data and vice versa + assertEquals(0L, tx1.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong()); + assertEquals(0L, tx2.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong()); + + tx1.commit(); + tx2.commit(); + + // both should be visible after commit + assertEquals(1L, client.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong()); + assertEquals(1L, client.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong()); + } + + /** + * #49: Open and close many transactions sequentially (stress test for resource cleanup). + */ + @Test + public void shouldOpenAndCloseManyTransactionsSequentially_49() throws Exception { + final Client client = cluster.connect().alias(GTX); + final long numberOfTransactions = 50; + + for (int i = 0; i < numberOfTransactions; i++) { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('stress')"); + tx.commit(); + } + + final long count = client.submit("g.V().hasLabel('stress').count()").all().get().get(0).getLong(); + assertEquals(numberOfTransactions, count); + + Thread.sleep(100); + // this should be 0, but to prevent flakiness, make it a reasonable number less than numberOfTransactions + assertTrue(server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount() < 35); + } + + /** + * #50: Reuse transaction across threads — synchronized submitInternal serializes correctly. + */ + // TODO: consider removing as this isn't an advertised functionality. its not advertised to be thread-safe. +// @Test +// public void shouldSerializeSubmitsAcrossThreads_50() throws Exception { +// final Client client = cluster.connect(); +// final RequestOptions gtxOptions = RequestOptions.build().addG(GTX).create(); +// +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); +// tx.begin(); +// +// final int threadCount = 8; +// final CountDownLatch latch = new CountDownLatch(threadCount); +// final AtomicBoolean failed = new AtomicBoolean(false); +// +// for (int i = 0; i < threadCount; i++) { +// final int idx = i; +// new Thread(() -> { +// try { +// tx.submit("g.addV('threaded').property('idx'," + idx + ")"); +// } catch (Exception ex) { +// logger.error("Thread {} failed", idx, ex); +// failed.set(true); +// } finally { +// latch.countDown(); +// } +// }).start(); +// } +// +// assertTrue("Threads did not complete in time", latch.await(30, TimeUnit.SECONDS)); +// assertFalse("One or more threads failed", failed.get()); +// +// tx.commit(); +// +// assertEquals((long) threadCount, +// client.submit("g.V().hasLabel('threaded').count()", gtxOptions) +// .all().get().get(0).getLong()); +// } + + /** + * #52: Interleaved transactional and non-transactional requests on the same Cluster. + */ + // TODO: continue here + @Test + public void shouldIsolateTransactionalAndNonTransactionalRequests_52() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('tx_data')"); + + // non-transactional read should not see uncommitted tx data + assertEquals(0L, client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong()); + + tx.commit(); + + // now the data should be visible + assertEquals(1L, client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong()); + } + + /** + * #53: Begin on a graph that doesn't support transactions fails at begin time. + */ + @Test + public void shouldRejectBeginOnNonTransactionalGraph_53() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact("gclassic"); + try { + tx.begin(); + fail("Expected exception when beginning transaction on non-transactional graph"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transactions not supported")); + } + } + + /** + * #54: Transaction targets the correct graph — vertex committed on gtx should not + * appear in gclassic. + */ + @Test + public void shouldTargetCorrectGraph_54() throws Exception { + final Client client = cluster.connect(); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV('routed')"); + tx.commit(); + + // vertex should exist in the transactional graph (gtx) + assertEquals(1L, client.submit("g.V().hasLabel('routed').count()", + RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong()); + + // vertex should NOT exist in the classic graph (gclassic) + assertEquals(0L, client.submit("g.V().hasLabel('routed').count()", + RequestOptions.build().addG("gclassic").create()).all().get().get(0).getLong()); + } + + /** + * #56: Non-transactional write is auto-committed. + */ + @Test + public void shouldAutoCommitNonTransactionalWrite_56() throws Exception { + final Client client = cluster.connect().alias(GTX); + + client.submit("g.addV('auto')").all().get(); + + assertEquals(1L, cluster.transact(GTX).submit("g.V().hasLabel('auto').count()").one().getLong()); + } + + /** + * #57: Non-transactional write is not affected by an open transaction on the same graph. + */ + @Test + public void shouldNotAffectNonTxWriteWhenTxRolledBack_57() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + // add vertex A in the transaction (don't commit) + tx.submit("g.addV('tx_vertex')"); + + // add vertex B non-transactionally + client.submit("g.addV('nontx_vertex')").all().get(); + + // vertex B should be immediately visible + assertEquals(1L, client.submit("g.V().hasLabel('nontx_vertex').count()") + .all().get().get(0).getLong()); + + // rollback the transaction + tx.rollback(); + + // vertex A should be gone, vertex B should persist + assertEquals(0L, client.submit("g.V().hasLabel('tx_vertex').count()").one().getLong()); + assertEquals(1L, client.submit("g.V().hasLabel('nontx_vertex').count()").one().getLong()); + } + + // TODO: Add test for submit(String, RequestOptions) — e.g. pass custom RequestOptions with parameters + // into a transaction and verify they are forwarded correctly alongside the transaction context. + + // TODO: Add test for submit(String, Map<String, Object>) — e.g. pass a parameterized traversal like + // "g.addV(label).property('name',name)" with a parameter map and verify it works within a transaction. + + /** + * #58: Cluster.transact() returns an HttpRemoteTransaction, calling begin() opens it, + * submit() sends traversals, commit() finalizes. + */ + // TODO: definitely get rid of this one right? its pretty much tested by 10 other tests. +// @Test +// public void shouldWorkWithExplicitTransactionObject_58() throws Exception { +// final Client client = cluster.connect().alias(GTX); +// +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); +// tx.begin(); +// +// tx.submit("g.addV('explicit_tx').property('name','test')"); +// tx.commit(); +// +// assertEquals(1L, client.submit("g.V().hasLabel('explicit_tx').count()", gtxOptions) +// .all().get().get(0).getLong()); +// } + + /** + * Combined test for #60, #61: Submit g.tx().commit() and g.tx().rollback() as + * gremlin-lang strings within a transaction via the HttpRemoteTransaction API. + */ + // TODO: move this to the Client test file because it can't be done with HttpRemoteTransaction +// @Test +// public void shouldSupportScriptBasedTransactionControl_60_61() throws Exception { +// final Client client = cluster.connect().alias(GTX); +// +// // #60: commit +// final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx1.begin(); +// tx1.submit("g.addV('script_commit')"); +// tx1.commit(); +// +// assertEquals(1L, client.submit("g.V().hasLabel('script_commit').count()", gtxOptions) +// .all().get().get(0).getLong()); +// +// // #61: rollback +// final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx2.begin(); +// tx2.submit("g.addV('script_rollback')"); +// tx2.rollback(); +// +// assertEquals(0L, client.submit("g.V().hasLabel('script_rollback').count()", gtxOptions) +// .all().get().get(0).getLong()); +// } + + /** + * #28: extractTransactionId() rejects empty/blank transaction IDs. + * <p> + * Verified indirectly: begin on a non-transactional graph fails server-side before + * a transactionId is ever assigned. The client cleans up properly. + */ + // TODO: best tested with a mock server. look into updating the ClientBehavioralTests with this. +// @Test +// public void shouldRejectEmptyTransactionId_28() throws Exception { +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact("gclassic"); +// +// try { +// tx.begin(); +// fail("Expected exception when beginning on non-transactional graph"); +// } catch (Exception ex) { +// // expected — server rejects the begin +// } +// +// // transaction should be cleaned up +// assertFalse(tx.isOpen()); +// assertNull(tx.getTransactionId()); +// } + + /** + * #29: If begin() fails, the client cleans up via closeInternal() — transaction moves + * to CLOSED state, isOpen() is false, getTransactionId() is null, and a second begin() + * throws because the state is no longer NOT_STARTED. + */ + @Test + public void shouldCleanUpOnBeginFailure_29() throws Exception { + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact("gclassic"); + + try { + tx.begin(); + fail("Expected exception on begin for non-transactional graph"); + } catch (RuntimeException ex) { + assertThat(ex.getMessage(), containsString("Failed to begin transaction")); + } + + // verify cleanup: transaction is closed and has no ID + assertFalse(tx.isOpen()); + assertNull(tx.getTransactionId()); + + // second begin should fail — state moved to CLOSED, not back to NOT_STARTED + try { + tx.begin(); + fail("Expected IllegalStateException on begin after failed begin"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + /** + * #62: If a traversal within a transaction throws an error, the transaction remains + * open for further operations or explicit rollback. + */ + @Test + public void shouldKeepTransactionOpenAfterTraversalError_62() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('good_vertex')"); + + // submit a bad traversal that should fail + try { + tx.submit("g.V().fail()"); + } catch (Exception ex) { + // expected error from bad traversal + } + + // transaction should still be open — rollback should work + assertTrue(tx.isOpen()); + tx.rollback(); + + assertFalse(tx.isOpen()); + assertEquals(0L, client.submit("g.V().hasLabel('good_vertex').count()").all().get().get(0).getLong()); + } + + /** + * #33: Transaction count tracking — open N transactions, commit some, rollback others, + * let one timeout. After all are resolved, verify the active count is zero. + * Configured with transactionTimeout=2000. + */ + // TODO: consider removing as this is server-test. make sure its covered in other test file then delete. +// @Test +// public void shouldTrackTransactionCountAccurately_33() throws Exception { +// final TransactionManager txManager = server.getServerGremlinExecutor().getTransactionManager(); +// +// assertEquals(0, txManager.getActiveTransactionCount()); +// +// // open 3 transactions +// final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx1.begin(GraphTraversalSource.class); +// +// final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx2.begin(GraphTraversalSource.class); +// +// final HttpRemoteTransaction tx3 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx3.begin(GraphTraversalSource.class); +// +// assertEquals(3, txManager.getActiveTransactionCount()); +// +// // commit one +// tx1.commit(); +// assertEquals(2, txManager.getActiveTransactionCount()); +// +// // rollback one +// tx2.rollback(); +// assertEquals(1, txManager.getActiveTransactionCount()); +// +// // let the third one timeout (configured at 2000ms) +// Thread.sleep(3000); +// assertEquals(0, txManager.getActiveTransactionCount()); +// } + + /** + * #35: Server shutdown with multiple open transactions — open several transactions with + * uncommitted data, trigger server shutdown. Verify uncommitted data is not persisted. + */ + // TODO: like above, this is probably server-side test only, make sure it exists then delete. +// @Test +// public void shouldRollbackAllOnServerShutdown_35() throws Exception { +// final TransactionManager txManager = server.getServerGremlinExecutor().getTransactionManager(); +// +// final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx1.begin(GraphTraversalSource.class); +// tx1.submit("g.addV('shutdown_test_1')"); +// +// final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx2.begin(GraphTraversalSource.class); +// tx2.submit("g.addV('shutdown_test_2')"); +// +// final HttpRemoteTransaction tx3 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx3.begin(GraphTraversalSource.class); +// tx3.submit("g.addV('shutdown_test_3')"); +// +// assertEquals(3, txManager.getActiveTransactionCount()); +// +// cluster.close(); +// cluster = null; +// +// // shut down the server — TransactionManager.shutdown() should rollback all +// stopServer(); +// +// // restart the server +// startServer(); +// +// // reconnect and verify no data was persisted +// final Cluster cluster2 = TestClientFactory.open(); +// try { +// final Client client2 = cluster2.connect(); +// +// +// assertEquals(0L, client2.submit("g.V().hasLabel('shutdown_test_1').count()", gtxOptions) +// .all().get().get(0).getLong()); +// assertEquals(0L, client2.submit("g.V().hasLabel('shutdown_test_2').count()", gtxOptions) +// .all().get().get(0).getLong()); +// assertEquals(0L, client2.submit("g.V().hasLabel('shutdown_test_3').count()", gtxOptions) +// .all().get().get(0).getLong()); +// +// assertEquals(0, server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount()); +// } finally { +// cluster2.close(); +// } +// } + + /** + * #36: Server shutdown completes within bounded time. + * Configured with perGraphCloseTimeout=2000. + */ + // Again, server test, don't need it here, make sure it exists in server-test. +// @Test +// public void shouldShutdownWithinBoundedTime_36() throws Exception { +// final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx1.begin(GraphTraversalSource.class); +// tx1.submit("g.addV('shutdown_timing')"); +// +// final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) cluster.transact(GTX); +// tx2.begin(GraphTraversalSource.class); +// tx2.submit("g.addV('shutdown_timing')"); +// +// cluster.close(); +// cluster = null; +// +// final long start = System.currentTimeMillis(); +// stopServer(); +// final long elapsed = System.currentTimeMillis() - start; +// +// assertTrue("Shutdown took too long: " + elapsed + "ms", elapsed < 60000); +// +// // restart for tearDown +// startServer(); +// } + + /** + * #40: perGraphCloseTimeout bounds shutdown duration — verify shutdown does not hang + * beyond the configured timeout even with open transactions. + * Configured with perGraphCloseTimeout=2000. + */ + // TODO: this probably should just get deleted. its too hard to test and not that valuable. +// @Test +// public void shouldBoundShutdownDurationWithOpenTransactions_40() throws Exception { +// final TransactionManager txManager = server.getServerGremlinExecutor().getTransactionManager(); +// +// for (int i = 0; i < 5; i++) { +// final HttpRemoteTransaction tx = (HttpRemoteTransaction) cluster.transact(GTX); +// tx.begin(); +// tx.submit("g.addV('timeout_bound_" + i + "')"); +// } +// +// assertEquals(5, txManager.getActiveTransactionCount()); +// +// cluster.close(); +// cluster = null; +// +// final long start = System.currentTimeMillis(); +// stopServer(); +// final long elapsed = System.currentTimeMillis() - start; +// +// assertTrue("Shutdown took too long: " + elapsed + "ms", elapsed < 60000); +// +// // restart for tearDown +// startServer(); +// } +}
