This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-tx-client
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit bee7717ede38615c844ebb26fd5bc8789ba75ee9
Author: Ken Hu <[email protected]>
AuthorDate: Mon Mar 16 17:24:06 2026 -0700

    gremlin-driver implementation of remote transactions
---
 .../traversal/dsl/graph/GraphTraversalSource.java  |   13 +-
 .../apache/tinkerpop/gremlin/driver/Client.java    |  206 ++--
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  195 +++-
 .../tinkerpop/gremlin/driver/Connection.java       |    7 -
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |   23 +-
 .../tinkerpop/gremlin/driver/RequestOptions.java   |   31 +
 .../tinkerpop/gremlin/driver/RequestSubmitter.java |   64 ++
 .../gremlin/driver/RequestSubmitterAsync.java      |   71 ++
 .../driver/handler/HttpGremlinRequestEncoder.java  |    7 +
 .../driver/remote/DriverRemoteConnection.java      |   15 +-
 .../driver/remote/HttpRemoteTransaction.java       |  315 ++++++
 .../driver/remote/TransactionRemoteConnection.java |  113 +++
 .../tinkerpop/gremlin/driver/ClientTest.java       |  159 +++
 .../GremlinDriverTransactionIntegrateTest.java     | 1059 ++++++++++++++++++++
 14 files changed, 2051 insertions(+), 227 deletions(-)

diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
index b1d417ee02..684d9e8609 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
@@ -703,14 +703,19 @@ public class GraphTraversalSource implements 
TraversalSource {
 
     /**
      * Proxies calls through to the underlying {@link Graph#tx()} or to the 
{@link RemoteConnection#tx()}.
+     * <p>
+     * When a remote connection is present, this method delegates to the 
connection's
+     * {@link RemoteConnection#tx()} method, which returns an appropriate 
transaction
+     * implementation for the remote connection type (e.g., {@code 
HttpRemoteTransaction}
+     * for HTTP-based connections).
+     *
+     * @return A {@link Transaction} for managing transactional operations
      */
     public Transaction tx() {
         if (null == this.connection)
             return this.graph.tx();
-        else {
-            throw new UnsupportedOperationException("TinkerPop 4 does not yet 
support remote transactions");
-        }
-
+        else
+            return this.connection.tx();
     }
 
     /**
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 95af4270f5..0a5f518ac3 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,8 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -27,22 +25,10 @@ import 
org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLException;
-import java.net.ConnectException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 /**
  * A {@code Client} is constructed from a {@link Cluster} and represents a way 
to send messages to Gremlin Server.
@@ -53,7 +39,7 @@ import java.util.stream.Collectors;
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public abstract class Client {
+public abstract class Client implements RequestSubmitter, 
RequestSubmitterAsync {
 
     private static final Logger logger = LoggerFactory.getLogger(Client.class);
     public static final String TOO_MANY_IN_FLIGHT_REQUESTS = "Number of active 
requests (%s) exceeds pool size (%s). " +
@@ -62,8 +48,6 @@ public abstract class Client {
     protected final Cluster cluster;
     protected volatile boolean initialized;
 
-    private static final Random random = new Random();
-
     Client(final Cluster cluster) {
         this.cluster = cluster;
     }
@@ -83,9 +67,10 @@ public abstract class Client {
     protected abstract void initializeImplementation();
 
     /**
-     * Chooses a {@link Connection} to write the message to.
+     * Selects the {@link Host} to send the request to. Implementations define 
the selection strategy
+     * (load-balanced vs pinned).
      */
-    protected abstract Connection chooseConnection(final RequestMessage msg) 
throws TimeoutException, ConnectionException;
+    protected abstract Host selectHost(final RequestMessage msg);
 
     /**
      * Asynchronous close of the {@code Client}.
@@ -239,6 +224,7 @@ public abstract class Client {
         options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
         options.getMaterializeProperties().ifPresent(mp -> 
request.addMaterializeProperties(mp));
         options.getBulkResults().ifPresent(bulked -> 
request.addBulkResults(Boolean.parseBoolean(bulked)));
+        options.getTransactionId().ifPresent(transactionId -> 
request.addTransactionId(transactionId));
 
         return submitAsync(request.create());
     }
@@ -255,9 +241,7 @@ public abstract class Client {
         final CompletableFuture<ResultSet> future = new CompletableFuture<>();
         Connection connection = null;
         try {
-            // the connection is returned to the pool once the response has 
been completed...see Connection.write()
-            // the connection may be returned to the pool with the host being 
marked as "unavailable"
-            connection = chooseConnection(msg);
+            connection = cluster.borrowConnection(selectHost(msg));
             connection.write(msg, future);
             return future;
         } catch (RuntimeException re) {
@@ -292,9 +276,7 @@ public abstract class Client {
      */
     public final static class ClusteredClient extends Client {
 
-        final ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new 
ConcurrentHashMap<>();
         private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
-        private Throwable initializationFailure = null;
 
         ClusteredClient(final Cluster cluster) {
             super(cluster);
@@ -344,137 +326,32 @@ public abstract class Client {
         }
 
         /**
-         * Uses a {@link LoadBalancingStrategy} to choose the best {@link 
Host} and then selects the best connection
-         * from that host's connection pool.
+         * Uses a {@link LoadBalancingStrategy} to choose the best {@link 
Host}.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws 
TimeoutException, ConnectionException {
+        protected Host selectHost(final RequestMessage msg) {
             final Iterator<Host> possibleHosts = 
this.cluster.loadBalancingStrategy().select(msg);
-
-            // try a random host if none are marked available. maybe it will 
reconnect in the meantime. better than
-            // going straight to a fast NoHostAvailableException as was the 
case in versions 3.5.4 and earlier
-            final Host bestHost = possibleHosts.hasNext() ? 
possibleHosts.next() : chooseRandomHost();
-            final ConnectionPool pool = hostConnectionPools.get(bestHost);
-            return 
pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, 
TimeUnit.MILLISECONDS);
-        }
-
-        private Host chooseRandomHost() {
-            final List<Host> hosts = new ArrayList<>(cluster.allHosts());
-            final int ix = random.nextInt(hosts.size());
-            return hosts.get(ix);
+            return possibleHosts.hasNext() ? possibleHosts.next() : 
cluster.randomHost();
         }
 
         /**
-         * Initializes the connection pools on all hosts.
+         * No-op — connection pools are owned and closed by the {@link 
Cluster}.
          */
         @Override
         protected void initializeImplementation() {
-            try {
-                CompletableFuture.allOf(cluster.allHosts().stream()
-                                .map(host -> CompletableFuture.runAsync(
-                                        () -> 
initializeConnectionSetupForHost.accept(host), cluster.hostScheduler()))
-                                .toArray(CompletableFuture[]::new))
-                        .join();
-            } catch (CompletionException ex) {
-                logger.error("Initialization failed", ex);
-                this.initializationFailure = ex;
-            }
-
-            // throw an error if there is no host available after initializing 
connection pool. we used
-            // to test cluster.availableHosts().isEmpty() but checking if we 
actually have hosts in
-            // the connection pool seems a bit more fireproof. if we look at 
initializeConnectionSetupForHost
-            // we can see that a successful initialization of the 
host/connection pool pair is followed by
-            // marking the host available and notifying the load balancing 
strategy. by relying directly on
-            // the state of hostConnectionPools we ensure that there is 
actually a concrete
-            // host/connection pool pair. even if the connection pool has 
immediate problems, it can fallback
-            // to its normal reconnection operation and won't put 
chooseConnection in a state where it can
-            // get a NPE if hostConnectionPools ends up being empty. it seems 
as if the safest minimum
-            // requirement for leaving this method is to ensure that at least 
one ConnectionPool constructor
-            // completed for at least one Host.
-            if (hostConnectionPools.isEmpty()) {
-                throwNoHostAvailableException();
-            }
-
-            // try to re-initiate any unavailable hosts in the background.
-            final List<Host> unavailableHosts = cluster.allHosts()
-                    .stream().filter(host -> 
!host.isAvailable()).collect(Collectors.toList());
-            if (!unavailableHosts.isEmpty()) {
-                handleUnavailableHosts(unavailableHosts);
-            }
-        }
-
-        private void throwNoHostAvailableException() {
-            final Throwable rootCause = 
ExceptionUtils.getRootCause(initializationFailure);
-            // allow the certain exceptions to propagate as a cause
-            if (rootCause instanceof SSLException || rootCause instanceof 
ConnectException) {
-                throw new NoHostAvailableException(initializationFailure);
-            } else {
-                throw new NoHostAvailableException();
-            }
+            // pools are initialized by Cluster.Manager.init()
         }
 
         /**
-         * Closes all the connection pools on all hosts.
+         * Marks this client as closed. Connection pools are managed by the 
{@link Cluster} and are not closed here.
          */
         @Override
         public synchronized CompletableFuture<Void> closeAsync() {
-            if (closing.get() != null)
-                return closing.get();
-
-            final CompletableFuture<Void> allPoolsClosedFuture =
-                    
CompletableFuture.allOf(hostConnectionPools.values().stream()
-                            .map(ConnectionPool::closeAsync)
-                            .toArray(CompletableFuture[]::new));
-
-            closing.set(allPoolsClosedFuture);
+            if (closing.get() != null) return closing.get();
+            closing.set(CompletableFuture.completedFuture(null));
             return closing.get();
         }
 
-        private final Consumer<Host> initializeConnectionSetupForHost = host 
-> {
-            try {
-                // hosts that don't initialize connection pools will come up 
as a dead host.
-                hostConnectionPools.put(host, new ConnectionPool(host, 
ClusteredClient.this));
-
-                // hosts are not marked as available at cluster initialization 
and are made available here instead.
-                host.makeAvailable();
-
-                // added a new host to the cluster so let the load-balancer 
know.
-                
ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
-            } catch (RuntimeException ex) {
-                final String errMsg = "Could not initialize client for " + 
host;
-                logger.error(errMsg);
-                throw ex;
-            }
-        };
-
-        private void handleUnavailableHosts(final List<Host> unavailableHosts) 
{
-            // start the re-initialization attempt for each of the unavailable 
hosts through Host.makeUnavailable().
-            for (Host host : unavailableHosts) {
-                final CompletableFuture<Void> f = CompletableFuture.runAsync(
-                        () -> host.makeUnavailable(this::tryReInitializeHost), 
cluster.hostScheduler());
-                f.exceptionally(t -> {
-                    logger.error("", (t.getCause() == null) ? t : 
t.getCause());
-                    return null;
-                });
-            }
-        }
-
-        /**
-         * Attempt to re-initialize the {@link Host} that was previously 
marked as unavailable.  This method gets called
-         * as part of a schedule in {@link Host} to periodically try to 
re-initialize.
-         */
-        public boolean tryReInitializeHost(final Host host) {
-            logger.debug("Trying to re-initiate host connection pool on {}", 
host);
-
-            try {
-                initializeConnectionSetupForHost.accept(host);
-                return true;
-            } catch (Exception ex) {
-                logger.debug("Failed re-initialization attempt on {}", host, 
ex);
-                return false;
-            }
-        }
-
     }
 
     /**
@@ -530,12 +407,12 @@ public abstract class Client {
         }
 
         /**
-         * Delegates to the underlying {@link Client.ClusteredClient}.
+         * Delegates host selection to the underlying {@link 
Client.ClusteredClient}.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws 
TimeoutException, ConnectionException {
+        protected Host selectHost(final RequestMessage msg) {
             if (close.isDone()) throw new IllegalStateException("Client is 
closed");
-            return client.chooseConnection(msg);
+            return client.selectHost(msg);
         }
 
         @Override
@@ -561,5 +438,54 @@ public abstract class Client {
             if (close.isDone()) throw new IllegalStateException("Client is 
closed");
             return new AliasClusteredClient(client, graphOrTraversalSource);
         }
+
     }
+
+    /**
+     * A {@link Client} that pins all requests to a single {@link Host}. Used 
internally by transactions
+     * to ensure all requests within a transaction go to the same server.
+     * <p>
+     * This client is not intended to be used directly — obtain a {@link 
org.apache.tinkerpop.gremlin.structure.Transaction}
+     * via {@link Cluster#transact()} or {@link Cluster#transact(String)} 
instead.
+     */
+    public static class PinnedClient extends Client {
+
+        private final Host pinnedHost;
+        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
+
+        PinnedClient(final Cluster cluster, final Host pinnedHost) {
+            super(cluster);
+            this.pinnedHost = pinnedHost;
+        }
+
+        public Host getPinnedHost() {
+            return pinnedHost;
+        }
+
+        @Override
+        protected void initializeImplementation() {
+            initialized = true; // PinnedClient only borrows resources so it's 
technically always initialized
+        }
+
+        @Override
+        protected Host selectHost(final RequestMessage msg) {
+            return pinnedHost;
+        }
+
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
+        /**
+         * Marks this client as closed. The underlying pool is owned by {@link 
Cluster} and is not closed here.
+         */
+        @Override
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null) return closing.get();
+            closing.set(CompletableFuture.completedFuture(null));
+            return closing.get();
+        }
+    }
+
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index e080da8d51..329bf82111 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -31,7 +31,11 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.driver.auth.Auth;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import 
org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor;
+import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
@@ -46,7 +50,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -60,15 +63,20 @@ import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -81,6 +89,7 @@ import java.util.stream.Collectors;
 public final class Cluster {
     public static final String SERIALIZER_INTERCEPTOR_NAME = "serializer";
     private static final Logger logger = 
LoggerFactory.getLogger(Cluster.class);
+    private static final Random random = new Random();
 
     private final Manager manager;
 
@@ -94,49 +103,42 @@ public final class Cluster {
     }
 
     /**
-     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
-     * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
-     * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
-     * single request.  The transactions are managed by the user and must be 
committed or rolled-back manually.
-     * <p/>
-     * Note that calling this method does not imply that a connection is made 
to the server itself at this point.
-     * Therefore, if there is only one server specified in the {@code Cluster} 
and that server is not available an
-     * error will not be raised at this point.  Connections get initialized in 
the {@link Client} when a request is
-     * submitted or can be directly initialized via {@link Client#init()}.
-     *
-     * @param sessionId user supplied id for the session which should be 
unique (a UUID is ideal).
+     * Creates a new {@link Client} based on the settings provided.
      */
-    public <T extends Client> T connect(final String sessionId) {
-        throw new UnsupportedOperationException("not implemented");
+    public <T extends Client> T connect() {
+        final Client client = new Client.ClusteredClient(this);
+        return (T) client;
     }
 
     /**
-     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
-     * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
-     * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
-     * single request.  If {@code manageTransactions} is set to {@code false} 
then transactions are managed by the
-     * user and must be committed or rolled-back manually. When set to {@code 
true} the transaction is committed or
-     * rolled-back at the end of each request.
-     * <p/>
-     * Note that calling this method does not imply that a connection is made 
to the server itself at this point.
-     * Therefore, if there is only one server specified in the {@code Cluster} 
and that server is not available an
-     * error will not be raised at this point.  Connections get initialized in 
the {@link Client} when a request is
-     * submitted or can be directly initialized via {@link Client#init()}.
+     * Creates a new {@link Transaction} using the server's default traversal 
source.
+     * The server will bind to "g" by default when no traversal source is 
specified.
+     */
+    public HttpRemoteTransaction transact() {
+        return transact(null);
+    }
+
+    /**
+     * Creates a new {@link Transaction} bound to the specified graph or 
traversal source.
      *
-     * @param sessionId user supplied id for the session which should be 
unique (a UUID is ideal).
-     * @param manageTransactions enables auto-transactions when set to true
+     * @param graphOrTraversalSource the graph/traversal source alias, or null 
to use the server default
      */
-    public <T extends Client> T connect(final String sessionId, final boolean 
manageTransactions) {
-        throw new UnsupportedOperationException("not implemented");
+    public HttpRemoteTransaction transact(final String graphOrTraversalSource) 
{
+        init();
+        final Host host = randomHost();
+        final Client.PinnedClient pinnedClient = new Client.PinnedClient(this, 
host);
+        return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource);
     }
 
     /**
-     * Creates a new {@link Client} based on the settings provided.
+     * Selects a random host from all known hosts.
+     * @return the randomly chosen Host.
      */
-    public <T extends Client> T connect() {
-        final Client client = new Client.ClusteredClient(this);
-        manager.trackClient(client);
-        return (T) client;
+    Host randomHost() {
+        final List<Host> all = new ArrayList<>(allHosts());
+        if (all.isEmpty()) throw new NoHostAvailableException();
+
+        return all.get(random.nextInt(all.size()));
     }
 
     @Override
@@ -365,6 +367,32 @@ public final class Cluster {
         return Collections.unmodifiableCollection(manager.allHosts());
     }
 
+    /**
+     * Returns the {@link ConnectionPool} for the given {@link Host}, or null 
if not found.
+     */
+    ConnectionPool getPoolFor(final Host host) {
+        return manager.getPoolFor(host);
+    }
+
+    public void trackTransaction(final HttpRemoteTransaction tx) {
+        manager.trackTransaction(tx);
+    }
+
+    public void untrackTransaction(final HttpRemoteTransaction tx) {
+        manager.untrackTransaction(tx);
+    }
+
+    /**
+     * Borrows a {@link Connection} from the pool for the given {@link Host}.
+     * Throws {@link IllegalStateException} if the cluster is closing.
+     */
+    Connection borrowConnection(final Host host) throws TimeoutException, 
ConnectionException {
+        if (isClosing()) throw new IllegalStateException("Cannot borrow a 
connection - cluster is closing");
+        final ConnectionPool pool = manager.getPoolFor(host);
+        if (pool == null) throw new NoHostAvailableException();
+        return 
pool.borrowConnection(connectionPoolSettings().maxWaitForConnection, 
TimeUnit.MILLISECONDS);
+    }
+
     Factory getFactory() {
         return manager.factory;
     }
@@ -393,7 +421,7 @@ public final class Cluster {
         return manager.connectionPoolSettings;
     }
 
-    LoadBalancingStrategy loadBalancingStrategy() {
+    public LoadBalancingStrategy loadBalancingStrategy() {
         return manager.loadBalancingStrategy;
     }
 
@@ -936,6 +964,8 @@ public final class Cluster {
 
     class Manager {
         private final ConcurrentMap<InetSocketAddress, Host> hosts = new 
ConcurrentHashMap<>();
+        private final ConcurrentMap<Host, ConnectionPool> hostConnectionPools 
= new ConcurrentHashMap<>();
+        private final Set<HttpRemoteTransaction> openTransactions = 
ConcurrentHashMap.newKeySet();
         private boolean initialized;
         private final List<InetSocketAddress> contactPoints;
         private final Factory factory;
@@ -970,8 +1000,6 @@ public final class Cluster {
 
         private final AtomicReference<CompletableFuture<Void>> closeFuture = 
new AtomicReference<>();
 
-        private final List<WeakReference<Client>> openedClients = new 
ArrayList<>();
-
         private Manager(final Builder builder) {
             validateBuilder(builder);
 
@@ -1075,10 +1103,72 @@ public final class Cluster {
             contactPoints.forEach(address -> {
                 final Host host = add(address);
             });
+
+            // initialize connection pools for all known hosts
+            try {
+                CompletableFuture.allOf(hosts.values().stream()
+                        .map(host -> CompletableFuture.runAsync(
+                                () -> initializeConnectionSetupForHost(host), 
hostScheduler))
+                        .toArray(CompletableFuture[]::new))
+                        .join();
+            } catch (CompletionException ex) {
+                logger.error("Cluster pool initialization failed", ex);
+            }
+
+            if (hostConnectionPools.isEmpty()) {
+                throw new NoHostAvailableException();
+            }
+
+            final List<Host> unavailableHosts = hosts.values().stream()
+                    .filter(host -> 
!host.isAvailable()).collect(Collectors.toList());
+            if (!unavailableHosts.isEmpty()) {
+                handleUnavailableHosts(unavailableHosts);
+            }
+        }
+
+        void initializeConnectionSetupForHost(final Host host) {
+            try {
+                hostConnectionPools.put(host, new ConnectionPool(host, 
Cluster.this));
+                host.makeAvailable();
+                loadBalancingStrategy.onNew(host);
+            } catch (RuntimeException ex) {
+                logger.error("Could not initialize connection pool for {}", 
host);
+                throw ex;
+            }
         }
 
-        void trackClient(final Client client) {
-            openedClients.add(new WeakReference<>(client));
+        void handleUnavailableHosts(final List<Host> unavailableHosts) {
+            for (Host host : unavailableHosts) {
+                final CompletableFuture<Void> f = CompletableFuture.runAsync(
+                        () -> host.makeUnavailable(this::tryReInitializeHost), 
hostScheduler);
+                f.exceptionally(t -> {
+                    logger.error("", (t.getCause() == null) ? t : 
t.getCause());
+                    return null;
+                });
+            }
+        }
+
+        public boolean tryReInitializeHost(final Host host) {
+            logger.debug("Trying to re-initiate host connection pool on {}", 
host);
+            try {
+                initializeConnectionSetupForHost(host);
+                return true;
+            } catch (Exception ex) {
+                logger.debug("Failed re-initialization attempt on {}", host, 
ex);
+                return false;
+            }
+        }
+
+        ConnectionPool getPoolFor(final Host host) {
+            return hostConnectionPools.get(host);
+        }
+
+        void trackTransaction(final HttpRemoteTransaction tx) {
+            openTransactions.add(tx);
+        }
+
+        void untrackTransaction(final HttpRemoteTransaction tx) {
+            openTransactions.remove(tx);
         }
 
         public Host add(final InetSocketAddress address) {
@@ -1096,25 +1186,22 @@ public final class Cluster {
             if (closeFuture.get() != null)
                 return closeFuture.get();
 
-            final List<CompletableFuture<Void>> clientCloseFutures = new 
ArrayList<>(openedClients.size());
-            for (WeakReference<Client> openedClient : openedClients) {
-                final Client client = openedClient.get();
-                if (client != null) {
-                    // best to call close() even if the Client is already 
closing so that we can be sure that
-                    // any background client closing operations are included 
in this shutdown future
-                    clientCloseFutures.add(client.closeAsync());
+            // best-effort rollback of any open transactions before closing 
pools snapshot to avoid concurrent
+            // modification since rollback() calls untrackTransaction()
+            new ArrayList<>(openTransactions).forEach(tx -> {
+                try {
+                    tx.rollback();
+                } catch (Exception e) {
+                    logger.warn("Failed to rollback transaction on cluster 
close", e);
                 }
-            }
+            });
+            openTransactions.clear();
 
-            // when all the clients are fully closed then shutdown the netty 
event loop. not sure why this needs to
-            // block here, but if it doesn't then factory.shutdown() below 
doesn't seem to want to ever complete.
-            // ideally, this should all be async, but i guess it wasn't before 
this change so just going to leave it
-            // for now as this really isn't the focus on this change
-            CompletableFuture.allOf(clientCloseFutures.toArray(new 
CompletableFuture[0])).join();
+            // close all connection pools owned by the cluster
+            CompletableFuture.allOf(hostConnectionPools.values().stream().
+                    
map(ConnectionPool::closeAsync).toArray(CompletableFuture[]::new)).join();
 
             final CompletableFuture<Void> closeIt = new CompletableFuture<>();
-            // shutdown the event loop. that shutdown can trigger some final 
jobs to get scheduled so add a listener
-            // to the termination event to shutdown remaining thread pools
             factory.shutdown().awaitUninterruptibly().addListener(f -> {
                 executor.shutdown();
                 hostScheduler.shutdown();
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index cb94b39b87..17b1beef65 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -57,7 +57,6 @@ final class Connection {
     private final URI uri;
     private final AtomicReference<ResultSet> pending = new AtomicReference<>();
     private final Cluster cluster;
-    private final Client client;
     private final ConnectionPool pool;
     private final String creatingThread;
     private final String createdTimestamp;
@@ -81,7 +80,6 @@ final class Connection {
     public Connection(final URI uri, final ConnectionPool pool) throws 
ConnectionException {
         this.uri = uri;
         this.cluster = pool.getCluster();
-        this.client = pool.getClient();
         this.pool = pool;
         this.creatingThread = Thread.currentThread().getName();
         this.createdTimestamp = Instant.now().toString();
@@ -90,9 +88,6 @@ final class Connection {
         if (cluster.isClosing())
             throw new IllegalStateException("Cannot open a connection with the 
cluster after close() is called");
 
-        if (client.isClosing())
-            throw new IllegalStateException("Cannot open a connection with the 
client after close() is called");
-
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
         try {
             channelizer = new Channelizer.HttpChannelizer();
@@ -111,8 +106,6 @@ final class Connection {
                 // if the closeFuture is not set, it means that closeAsync() 
wasn't called
                 if (thisConnection.closeFuture.get() == null) {
                     if 
(!channel.hasAttr(IdleConnectionHandler.IDLE_STATE_EVENT)) {
-                        // if idle state event is not present, it means the 
server closed the channel for some reason.
-                        // it's important to distinguish that difference in 
debugging
                         logger.error(String.format(
                                 "Server closed the Connection on channel %s - 
scheduling removal from %s",
                                 channel.id().asShortText(), 
thisConnection.pool.getPoolInfo(thisConnection)));
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee4c34c5aa..3581e1db1a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -57,7 +57,6 @@ final class ConnectionPool {
 
     public final Host host;
     private final Cluster cluster;
-    private final Client client;
     private final List<Connection> connections;
     private final AtomicInteger open;
     private final Queue<Connection> availableConnections = new 
ConcurrentLinkedQueue<>();
@@ -90,18 +89,17 @@ final class ConnectionPool {
         public void setTimeNow() { timeOfConnectionAttempt = 
System.currentTimeMillis(); }
     }
 
-    public ConnectionPool(final Host host, final Client client) {
-        this(host, client, Optional.empty());
+    public ConnectionPool(final Host host, final Cluster cluster) {
+        this(host, cluster, Optional.empty());
     }
 
-    public ConnectionPool(final Host host, final Client client, final 
Optional<Integer> overrideMaxPoolSize) {
-        this(host, client, overrideMaxPoolSize, new 
ConnectionFactory.DefaultConnectionFactory());
+    public ConnectionPool(final Host host, final Cluster cluster, final 
Optional<Integer> overrideMaxPoolSize) {
+        this(host, cluster, overrideMaxPoolSize, new 
ConnectionFactory.DefaultConnectionFactory());
     }
 
-    ConnectionPool(final Host host, final Client client, final 
Optional<Integer> overrideMaxPoolSize, final ConnectionFactory 
connectionFactory) {
+    ConnectionPool(final Host host, final Cluster cluster, final 
Optional<Integer> overrideMaxPoolSize, final ConnectionFactory 
connectionFactory) {
         this.host = host;
-        this.client = client;
-        this.cluster = client.cluster;
+        this.cluster = cluster;
         this.connectionFactory = connectionFactory;
         poolLabel = "Connection Pool {host=" + host + "}";
 
@@ -207,10 +205,6 @@ final class ConnectionPool {
         }
     }
 
-    Client getClient() {
-        return client;
-    }
-
     Cluster getCluster() {
         return cluster;
     }
@@ -415,7 +409,8 @@ final class ConnectionPool {
             // Assumes that the root cause will give better information about 
why the connection failed.
             
cause.append(ExceptionHelper.getRootCause(res.getFailureCause()).getMessage());
         } else if (open.get() >= maxPoolSize) {
-            cause.append(String.format(Client.TOO_MANY_IN_FLIGHT_REQUESTS, 
open.get(), maxPoolSize));
+            cause.append(String.format("Number of active requests (%s) exceeds 
pool size (%s). " +
+                    "Consider increasing the value for 
maxConnectionPoolSize.", open.get(), maxPoolSize));
         } else {
             cause.setLength(0);
         }
@@ -479,7 +474,7 @@ final class ConnectionPool {
             // pool needs it. for now that seems like an unnecessary added bit 
of complexity for dealing with this
             // error state
             connection = connectionFactory.create(this);
-            final RequestMessage ping = 
client.buildMessage(cluster.validationRequest()).create();
+            final RequestMessage ping = cluster.validationRequest().create();
             final CompletableFuture<ResultSet> f = new CompletableFuture<>();
             connection.write(ping, f);
             f.get().all().get();
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
index a43e186164..c51f2b8516 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
@@ -50,6 +50,7 @@ public final class RequestOptions {
     private final String language;
     private final String materializeProperties;
     private final String bulkResults;
+    private final String transactionId;
 
     private RequestOptions(final Builder builder) {
         this.graphOrTraversalSource = builder.graphOrTraversalSource;
@@ -59,6 +60,7 @@ public final class RequestOptions {
         this.language = builder.language;
         this.materializeProperties = builder.materializeProperties;
         this.bulkResults = builder.bulkResults;
+        this.transactionId = builder.transactionId;
     }
 
     public Optional<String> getG() {
@@ -85,6 +87,8 @@ public final class RequestOptions {
 
     public Optional<String> getBulkResults() { return 
Optional.ofNullable(bulkResults); }
 
+    public Optional<String> getTransactionId() { return 
Optional.ofNullable(transactionId); }
+
     public static Builder build() {
         return new Builder();
     }
@@ -125,6 +129,25 @@ public final class RequestOptions {
         private String materializeProperties = null;
         private String language = null;
         private String bulkResults = null;
+        private String transactionId = null;
+
+        /**
+         * Creates a {@link Builder} populated with the values from the 
provided {@link RequestOptions}.
+         * @param options the options to copy from
+         * @return a {@link Builder} with the copied options
+         */
+        public static Builder from(final RequestOptions options) {
+            final Builder builder = build();
+            builder.graphOrTraversalSource = options.graphOrTraversalSource;
+            builder.parameters = options.parameters;
+            builder.batchSize = options.batchSize;
+            builder.timeout = options.timeout;
+            builder.materializeProperties = options.materializeProperties;
+            builder.language = options.language;
+            builder.bulkResults = options.bulkResults;
+            builder.transactionId = options.transactionId;
+            return builder;
+        }
 
         /**
          * The aliases to set on the request.
@@ -196,6 +219,14 @@ public final class RequestOptions {
             return this;
         }
 
+        /**
+         * Sets the transactionId value to be sent on the request.
+         */
+        public Builder transactionId(final String id) {
+            this.transactionId = id;
+            return this;
+        }
+
         public RequestOptions create() {
             return new RequestOptions(this);
         }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
new file mode 100644
index 0000000000..e5fd74ad98
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import java.util.Map;
+
+/**
+ * Defines the synchronous request submission contract for Gremlin requests.
+ * <p>
+ * This interface is implemented by both {@link Client} and transaction classes
+ * to ensure a consistent API for submitting Gremlin scripts. The synchronous
+ * nature of these methods means they block until the request completes.
+ * <p>
+ * For asynchronous submission, see {@link RequestSubmitterAsync}.
+ *
+ * @author Apache TinkerPop
+ */
+public interface RequestSubmitter {
+
+    /**
+     * Submits a Gremlin script and blocks until the response is received.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin);
+
+    /**
+     * Submits a Gremlin script with bound parameters and blocks until the 
response is received.
+     * <p>
+     * Prefer this method over string concatenation when executing scripts 
with variable
+     * arguments, as parameterized scripts perform better.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param parameters a map of parameters that will be bound to the script 
on execution
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin, Map<String, Object> parameters);
+
+    /**
+     * Submits a Gremlin script with request options and blocks until the 
response is received.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param options the options to supply for this request
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin, RequestOptions options);
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
new file mode 100644
index 0000000000..eb700f881f
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Defines the asynchronous request submission contract for Gremlin requests.
+ * <p>
+ * This interface is implemented by {@link Client} to provide non-blocking
+ * request submission. The returned {@link CompletableFuture} completes when
+ * the write of the request is complete, not when the response is received.
+ * <p>
+ * Note: Transaction classes intentionally do not implement this interface
+ * because transactional operations require sequential execution to maintain
+ * ordering guarantees over HTTP.
+ * <p>
+ * For synchronous submission, see {@link RequestSubmitter}.
+ *
+ * @author Apache TinkerPop
+ */
+public interface RequestSubmitterAsync {
+
+    /**
+     * Submits a Gremlin script asynchronously.
+     * <p>
+     * The returned future completes when the write of the request is complete.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin);
+
+    /**
+     * Submits a Gremlin script with bound parameters asynchronously.
+     * <p>
+     * Prefer this method over string concatenation when executing scripts 
with variable
+     * arguments, as parameterized scripts perform better.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param parameters a map of parameters that will be bound to the script 
on execution
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin, Map<String, 
Object> parameters);
+
+    /**
+     * Submits a Gremlin script with request options asynchronously.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param options the options to supply for this request
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin, RequestOptions 
options);
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index c94d74264d..2b45be91ce 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -91,6 +91,13 @@ public final class HttpGremlinRequestEncoder extends 
MessageToMessageEncoder<Req
             if (bulkResults) {
                 headersMap.put(Tokens.BULK_RESULTS, "true");
             }
+            
+            // Add X-Transaction-Id header to comply with specification's dual 
transmission (header and body)
+            final String transactionId = 
requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
+            if (transactionId != null) {
+                headersMap.put("X-Transaction-Id", transactionId);
+            }
+            
             HttpRequest gremlinRequest = new HttpRequest(headersMap, 
requestMessage, uri);
 
             for (final Pair<String, ? extends RequestInterceptor> interceptor 
: interceptors) {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index c8d56814c4..c909e48604 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -248,15 +248,14 @@ public class DriverRemoteConnection implements 
RemoteConnection {
     }
 
     /**
-     * Constructs a new {@link DriverRemoteTransaction}. Not yet supported in 
TinkerPop 4.
+     * Creates a new {@link HttpRemoteTransaction} for executing transactional 
operations.
+     *
+     * @return A new {@link HttpRemoteTransaction}
      */
-//    @Override
-//    public Transaction tx() {
-//        // todo: not implemented
-//        final DriverRemoteConnection session = new DriverRemoteConnection(
-//                client.getCluster().connect(), remoteTraversalSourceName, 
true);
-//        return new DriverRemoteTransaction(session);
-//    }
+    @Override
+    public Transaction tx() {
+        return client.getCluster().transact(remoteTraversalSourceName);
+    }
 
     @Override
     public String toString() {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
new file mode 100644
index 0000000000..0dccb4cb1b
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Host;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.RequestSubmitter;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TransactionException;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A {@link Transaction} implementation for HTTP-based remote connections.
+ * <p>
+ * This class provides synchronous, sequential request execution within a 
transaction context.
+ * All requests are pinned to a single host and include the transaction ID 
(after begin()).
+ * <p>
+ * Key characteristics:
+ * <ul>
+ *   <li>Synchronous API only - no submitAsync() methods</li>
+ *   <li>Host pinning - all requests go to the same server</li>
+ *   <li>Sequential execution - requests block until complete</li>
+ * </ul>
+ * <p>
+ * Usage:
+ * <pre>
+ * Transaction tx = cluster.transact("g");
+ * GraphTraversalSource gtx = tx.begin();
+ * gtx.addV("person").property("name", "alice").iterate();
+ * tx.commit();
+ * </pre>
+ *
+ * This class is <b>NOT</b> thread-safe.
+ */
+public class HttpRemoteTransaction implements Transaction, RequestSubmitter {
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpRemoteTransaction.class);
+    private static final long CLOSING_MAX_WAIT_MS = 100000; // TODO: revert 
back to 10k
+
+    protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT;
+    private final Client.PinnedClient pinnedClient;
+    private final Cluster cluster;
+    private final Host pinnedHost;
+    private final String graphAlias;
+    private String transactionId;  // null until begin(), set from server 
response
+    private TransactionState state = TransactionState.NOT_STARTED;
+
+    private enum TransactionState {
+        NOT_STARTED, OPEN, CLOSED
+    }
+
+    /**
+     * Creates a new HTTP transaction.
+     * <p>
+     * The transaction is not started until {@link #begin(Class)} is called.
+     * A host is selected at creation time and all requests will be pinned to 
it.
+     *
+     * @param pinnedClient the underlying client for connection access
+     * @param graphAlias the graph/traversal source alias (e.g., "g")
+     * @throws NoHostAvailableException if no hosts are available in the 
cluster
+     */
+    public HttpRemoteTransaction(final Client.PinnedClient pinnedClient, final 
String graphAlias) {
+        this.pinnedClient = pinnedClient;
+        this.graphAlias = graphAlias;
+        this.pinnedHost = pinnedClient.getPinnedHost();
+        this.cluster = pinnedClient.getCluster();
+    }
+
+    /**
+     * Not supported for remote transactions. Use {@link #begin(Class)} 
instead.
+     *
+     * @throws UnsupportedOperationException always
+     */
+    @Override
+    public void open() {
+        begin();
+    }
+
+    /**
+     * Starts a transaction and returns a traversal source bound to it.
+     * <p>
+     * This method sends {@code g.tx().begin()} to the server, which returns
+     * the transaction ID. All subsequent requests will include this ID.
+     *
+     * @param traversalSourceClass the class of the traversal source to create
+     * @param <T> the type of the traversal source
+     * @return a new traversal source bound to this transaction
+     * @throws IllegalStateException if the transaction is already started
+     * @throws RuntimeException if the transaction fails to begin
+     */
+    @Override
+    public <T extends TraversalSource> T begin(final Class<T> 
traversalSourceClass) {
+        if (state != TransactionState.NOT_STARTED) {
+            throw new IllegalStateException("Transaction already started");
+        }
+        cluster.trackTransaction(this);
+
+        try {
+            // Send begin - no txId attached yet
+            final ResultSet rs = submitInternal("g.tx().begin()");
+            
+            // Server returns the transaction ID
+            this.transactionId = extractTransactionId(rs);
+            this.state = TransactionState.OPEN;
+        } catch (Exception e) {
+            cleanUp();
+            throw new RuntimeException("Failed to begin transaction: " + 
e.getMessage(), e);
+        }
+
+        // Create RemoteConnection for the traversal source
+        final TransactionRemoteConnection txConnection = new 
TransactionRemoteConnection(this);
+
+        try {
+            return 
traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(txConnection);
+        } catch (Exception e) {
+            rollback();
+            throw new IllegalStateException("Failed to create 
TraversalSource", e);
+        }
+    }
+
+    /**
+     * Extracts the transaction ID from the begin() response.
+     * <p>
+     * The server returns the transaction ID as part of the response to 
g.tx().begin().
+     *
+     * @param rs the result set from the begin request
+     * @return the transaction ID
+     */
+    private String extractTransactionId(final ResultSet rs) {
+        // Wait for all results and extract the transaction ID
+        final List<Result> results = rs.all().join();
+        if (results.isEmpty()) {
+            throw new IllegalStateException("Server did not return transaction 
ID");
+        }
+        // The transaction ID is returned as the result of g.tx().begin()
+        final Object id = results.get(0).get(Map.class).get("transactionId");
+        if (id == null) throw new IllegalStateException("Server did not return 
transaction ID");
+
+        final String idStr = id.toString();
+        if (idStr.isBlank()) throw new IllegalStateException("Server returned 
empty transaction ID");
+
+        return idStr;
+    }
+
+    /**
+     * Commits the transaction.
+     * <p>
+     * Sends {@code g.tx().commit()} to the server and closes the transaction.
+     *
+     * @throws IllegalStateException if the transaction is not open
+     * @throws RuntimeException if the commit fails
+     */
+    @Override
+    public void commit() {
+        closeRemoteTransaction("g.tx().commit()");
+    }
+
+    /**
+     * Rolls back the transaction.
+     * <p>
+     * Sends {@code g.tx().rollback()} to the server and closes the 
transaction.
+     * This is best-effort - errors are logged but not thrown.
+     */
+    @Override
+    public void rollback() {
+        closeRemoteTransaction("g.tx().rollback()");
+    }
+
+    private void closeRemoteTransaction(final String closeScript) {
+        if (state != TransactionState.OPEN) throw new 
IllegalStateException("Transaction is not open");
+
+        try {
+            submitInternal(closeScript).all().get(CLOSING_MAX_WAIT_MS, 
TimeUnit.MILLISECONDS);
+            cleanUp();
+        } catch (Exception e) {
+            logger.warn("Failed to {} transaction on {}", closeScript, 
pinnedHost);
+            throw new TransactionException("Failed to " + closeScript, e);
+        }
+    }
+
+    private void cleanUp() {
+        state = TransactionState.CLOSED;
+        cluster.untrackTransaction(this);
+    }
+
+    /**
+     * Returns the server-generated transaction ID, or {@code null} if the 
transaction
+     * has not yet been started via {@link #begin(Class)}.
+     *
+     * @return the transaction ID, or null if not yet begun
+     */
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return state == TransactionState.OPEN;
+    }
+
+    @Override
+    public void readWrite() {
+        throw new UnsupportedOperationException("Remote transaction behaviors 
are not configurable - they are always manually controlled");
+    }
+
+    @Override
+    public void close() {
+        closeConsumer.accept(this);
+        
+        // this is just for safety in case of custom closeConsumer but should 
normally be handled by commit/rollback
+        cleanUp();
+    }
+
+    @Override
+    public Transaction onReadWrite(final Consumer<Transaction> consumer) {
+        throw new UnsupportedOperationException("Remote transaction behaviors 
are not configurable - they are always manually controlled");
+    }
+
+    @Override
+    public Transaction onClose(final Consumer<Transaction> consumer) {
+        this.closeConsumer = consumer;
+        return this;
+    }
+
+    @Override
+    public void addTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public void removeTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public void clearTransactionListeners() {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin) {
+        return submit(gremlin, RequestOptions.EMPTY);
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin, final Map<String, Object> 
parameters) {
+        final RequestOptions.Builder builder = RequestOptions.build();
+        if (parameters != null && !parameters.isEmpty()) {
+            parameters.forEach(builder::addParameter);
+        }
+        return submit(gremlin, builder.create());
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin, final RequestOptions 
options) {
+        if (state != TransactionState.OPEN) {
+            throw new IllegalStateException("Transaction is not open");
+        }
+        return submitInternal(gremlin, options);
+    }
+
+    private ResultSet submitInternal(final String gremlin) {
+        return submitInternal(gremlin, RequestOptions.EMPTY);
+    }
+
+    // synchronized here is a bit defensive but ensures that even if a user 
accidentally uses this in different threads,
+    // the server will still receive the requests in the correct order
+    private synchronized ResultSet submitInternal(final String gremlin, final 
RequestOptions options) {
+        final RequestOptions.Builder builder = 
RequestOptions.Builder.from(options);
+        if (graphAlias != null) {
+            // Don't allow per-request override of "g" as transactions should 
only target a single Graph instance.
+            builder.addG(graphAlias);
+        }
+
+        // Attach txId if we have one (not present for begin())
+        if (transactionId != null) {
+            builder.transactionId(transactionId);
+        }
+
+        try {
+            return pinnedClient.submit(gremlin, builder.create());
+        } catch (Exception e) {
+            throw new RuntimeException("Transaction request failed: " + 
e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
new file mode 100644
index 0000000000..0f9e903dd8
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.GremlinLang;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.tinkerpop.gremlin.driver.RequestOptions.getRequestOptions;
+
+/**
+ * A {@link RemoteConnection} that routes all submissions through an {@link 
HttpRemoteTransaction}.
+ * <p>
+ * This connection adapts the synchronous transaction API to the async {@link 
RemoteConnection}
+ * interface required by {@link 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource}.
+ * <p>
+ * Key characteristics:
+ * <ul>
+ *   <li>Submissions are synchronous internally (via {@link 
HttpRemoteTransaction#submit})</li>
+ *   <li>Returns completed futures to satisfy the async interface</li>
+ *   <li>Transaction ID is automatically attached by the transaction</li>
+ * </ul>
+ */
+class TransactionRemoteConnection implements RemoteConnection {
+
+    private final HttpRemoteTransaction transaction;
+
+    /**
+     * Creates a new connection bound to the specified transaction.
+     *
+     * @param transaction the transaction that owns this connection
+     */
+    TransactionRemoteConnection(final HttpRemoteTransaction transaction) {
+        this.transaction = transaction;
+    }
+
+    /**
+     * Submits a traversal through the transaction.
+     * <p>
+     * The submission is synchronous internally but returns a completed future
+     * to satisfy the {@link RemoteConnection} interface.
+     *
+     * @param gremlinLang the traversal to submit
+     * @param <E> the type of elements returned by the traversal
+     * @return a completed future with the traversal results
+     * @throws RemoteConnectionException if the transaction is not open or 
submission fails
+     */
+    @Override
+    public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final 
GremlinLang gremlinLang)
+            throws RemoteConnectionException {
+        if (!transaction.isOpen()) {
+            throw new RemoteConnectionException("Transaction is not open");
+        }
+
+        try {
+            // Synchronous submission through transaction
+            final ResultSet rs = transaction.submit(gremlinLang.getGremlin(), 
getRequestOptions(gremlinLang));
+
+            final RemoteTraversal<?, E> traversal = new 
DriverRemoteTraversal<>(rs,
+                null,   // client not needed for iteration
+                false,  // attachElements
+                Optional.empty());
+
+            return CompletableFuture.completedFuture(traversal);
+        } catch (Exception e) {
+            throw new RemoteConnectionException(e);
+        }
+    }
+
+    /**
+     * Returns the owning transaction.
+     *
+     * @return the transaction that owns this connection
+     */
+    @Override
+    public Transaction tx() {
+        return transaction;
+    }
+
+    /**
+     * No-op close implementation.
+     * <p>
+     * The transaction manages its own lifecycle - users should call
+     * {@link Transaction#commit()} or {@link Transaction#rollback()} 
explicitly.
+     */
+    @Override
+    public void close() {
+        // Transaction manages its own lifecycle - don't close it here
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000000..23443c1b3e
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link Client} subclasses verifying the refactored host 
selection
+ * and connection pool ownership model.
+ */
+public class ClientTest {
+
+    /**
+     * ClusteredClient.selectHost() should return the host from the load 
balancing strategy.
+     */
+    @Test
+    public void shouldSelectHostViaLoadBalancingStrategy() {
+        final Host expectedHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        
when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+
+        final Host selected = client.selectHost(msg);
+        assertSame(expectedHost, selected);
+    }
+
+    /**
+     * When the load balancing strategy returns an empty iterator, 
ClusteredClient.selectHost()
+     * should fall back to Cluster.randomHost().
+     */
+    @Test
+    public void shouldFallbackToRandomHostWhenStrategyReturnsEmpty() {
+        final Host fallbackHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        when(strategy.select(any())).thenReturn(Collections.emptyIterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+        when(cluster.randomHost()).thenReturn(fallbackHost);
+
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+
+        final Host selected = client.selectHost(msg);
+        assertSame(fallbackHost, selected);
+    }
+
+    /**
+     * PinnedClient.selectHost() should always return the pinned host 
regardless of the message.
+     */
+    @Test
+    public void shouldAlwaysReturnPinnedHost() {
+        final Host pinnedHost = mock(Host.class);
+        final Cluster cluster = mock(Cluster.class);
+
+        final Client.PinnedClient client = new Client.PinnedClient(cluster, 
pinnedHost);
+
+        final RequestMessage msg1 = RequestMessage.build("g.V()").create();
+        final RequestMessage msg2 = RequestMessage.build("g.E()").create();
+
+        assertSame(pinnedHost, client.selectHost(msg1));
+        assertSame(pinnedHost, client.selectHost(msg2));
+        assertSame(pinnedHost, client.selectHost(null));
+    }
+
+    /**
+     * AliasClusteredClient.selectHost() should delegate to the underlying 
client's selectHost().
+     */
+    @Test
+    public void shouldDelegateSelectHostInAliasClient() {
+        final Host expectedHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        
when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+
+        final Client.ClusteredClient underlying = new 
Client.ClusteredClient(cluster);
+        final Client.AliasClusteredClient aliasClient = new 
Client.AliasClusteredClient(underlying, "g");
+
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+        final Host selected = aliasClient.selectHost(msg);
+        assertSame(expectedHost, selected);
+    }
+
+    /**
+     * ClusteredClient.closeAsync() should return a completed future without 
closing any pools.
+     */
+    @Test
+    public void shouldNotClosePoolsOnClusteredClientClose() {
+        final Cluster cluster = mock(Cluster.class);
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+
+        assertFalse(client.isClosing());
+
+        final java.util.concurrent.CompletableFuture<Void> future = 
client.closeAsync();
+        assertNotNull(future);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertTrue(client.isClosing());
+
+        // calling again should return the same future
+        assertSame(future, client.closeAsync());
+    }
+
+    /**
+     * PinnedClient.closeAsync() should return a completed future without 
closing any pools.
+     */
+    @Test
+    public void shouldNotClosePoolsOnPinnedClientClose() {
+        final Host host = mock(Host.class);
+        final Cluster cluster = mock(Cluster.class);
+        final Client.PinnedClient client = new Client.PinnedClient(cluster, 
host);
+
+        assertFalse(client.isClosing());
+
+        final java.util.concurrent.CompletableFuture<Void> future = 
client.closeAsync();
+        assertNotNull(future);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertTrue(client.isClosing());
+
+        // calling again should return the same future
+        assertSame(future, client.closeAsync());
+    }
+}
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
new file mode 100644
index 0000000000..c35116f020
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
@@ -0,0 +1,1059 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for HTTP remote transactions using the driver API ({@code 
Cluster.transact()}).
+ * <p>
+ * Tests exercise {@link HttpRemoteTransaction} directly via its {@code 
submit()} methods
+ * rather than through {@link GraphTraversalSource}, which has its own unit 
tests.
+ * <p>
+ * Tests are numbered according to the HTTP Remote Transaction Test Plan.
+ * Server-side verification tests (raw HTTP) are in {@link 
GremlinServerHttpTransactionIntegrateTest}.
+ */
+public class GremlinDriverTransactionIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(GremlinDriverTransactionIntegrateTest.class);
+
+    private static final String GTX = "gtx";
+
+    private Cluster cluster;
+
+    @Before
+    public void openCluster() {
+        cluster = TestClientFactory.open();
+    }
+
+    @After
+    public void closeCluster() throws Exception {
+        if (cluster != null) cluster.close();
+    }
+
+    /**
+     * Polls the given condition until it returns {@code true} or the timeout 
expires.
+     * Preferred over {@code Thread.sleep()} for timeout-dependent assertions 
to avoid
+     * flakiness in slower CI environments.
+     */
+    private static void awaitCondition(final long timeoutMs, final long pollMs,
+                                       final Callable<Boolean> condition) 
throws Exception {
+        final long deadline = System.currentTimeMillis() + timeoutMs;
+        while (System.currentTimeMillis() < deadline) {
+            if (condition.call()) return;
+            Thread.sleep(pollMs);
+        }
+        throw new AssertionError("Condition not met within " + timeoutMs + 
"ms");
+    }
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        final String nameOfTest = name.getMethodName();
+        switch (nameOfTest) {
+            case "shouldEnforceMaxConcurrentTransactions_22":
+                settings.maxConcurrentTransactions = 1;
+                break;
+            case "shouldTimeoutIdleTransaction_23":
+            case "shouldTimeoutIdleTransactionWithNoOperations_30":
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldTimeoutOnlyIdleTransactionNotActiveOne_31":
+                settings.transactionTimeout = 2000;
+                break;
+            case "shouldRejectLateCommitAfterTimeout_32":
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34":
+                settings.maxConcurrentTransactions = 1;
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldCommitBeforeTimeoutWins_37":
+                settings.transactionTimeout = 3000;
+                break;
+            case "shouldTimeoutAbandonedTransaction_39":
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldTrackTransactionCountAccurately_33":
+                settings.transactionTimeout = 2000;
+                break;
+            case "shouldRollbackAllOnServerShutdown_35":
+                break;
+            case "shouldShutdownWithinBoundedTime_36":
+                settings.perGraphCloseTimeout = 2000;
+                break;
+        }
+        return settings;
+    }
+
+    /**
+     * Begin a transaction, add a vertex, verify isolation, commit, verify 
isOpen transitions, and verify data persists
+     * after commit.
+     */
+    @Test
+    public void shouldCommitTransaction_1_4_6_7() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        // #4: isOpen true after begin
+        assertTrue(tx.isOpen());
+
+        tx.submit("g.addV('person').property('name','alice')");
+
+        // #6: uncommitted data not visible outside the transaction
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        tx.commit();
+        // #4: isOpen false after commit
+        assertFalse(tx.isOpen());
+        // #1, #7: committed data visible to non-transactional reads
+        assertEquals(1L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        client.close();
+    }
+
+    /**
+     * Combined test for #2, #5: Begin a transaction, add a vertex, rollback,
+     * verify isOpen is false and data is discarded.
+     */
+    @Test
+    public void shouldRollbackTransaction_2_5() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        assertTrue(tx.isOpen());
+
+        tx.submit("g.addV('person').property('name','bob')");
+
+        tx.rollback();
+        // #5: isOpen false after rollback
+        assertFalse(tx.isOpen());
+        // #2: data discarded after rollback
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        client.close();
+    }
+
+    // 
=========================================================================
+    // B. Intra-Transaction Consistency — #3, #8, #9 combined
+    // 
=========================================================================
+
+    /**
+     * Combined test for #3, #8, #9: Begin a transaction, add vertices and 
edges as separate
+     * requests, verify read-your-own-writes within the transaction, commit, 
verify persistence.
+     */
+    @Test
+    public void shouldSupportIntraTransactionConsistency_3_8_9() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('test').property('name','A')");
+        // #8: read-your-own-writes — vertex A visible within the transaction
+        assertEquals(1L, 
tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+
+        tx.submit("g.addV('test').property('name','B')");
+        
tx.submit("g.V().has('name','A').addE('knows').to(V().has('name','B'))");
+
+        // verify the full subgraph is visible within the transaction before 
commit
+        assertEquals(2L, 
tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
tx.submit("g.V().outE('knows').count()").all().get().get(0).getLong());
+
+        tx.commit();
+
+        // #3: vertices and edges persist after commit
+        assertEquals(2L, 
client.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
client.submit("g.E().hasLabel('knows').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #10: Submit on a committed transaction throws IllegalStateException.
+     */
+    @Test
+    public void shouldThrowOnSubmitAfterCommit_10() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV()");
+        tx.commit();
+
+        try {
+            tx.submit("g.V().count()");
+            fail("Expected exception on submit after commit");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    /**
+     * #11: Submit on a rolled-back transaction throws IllegalStateException.
+     */
+    @Test
+    public void shouldThrowOnSubmitAfterRollback_11() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV()");
+        tx.rollback();
+
+        try {
+            tx.submit("g.V().count()");
+            fail("Expected exception on submit after rollback");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    /**
+     * #12: Calling begin() twice on the same HttpRemoteTransaction throws 
IllegalStateException.
+     */
+    @Test
+    public void shouldThrowOnDoubleBegin_12() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+
+        try {
+            tx.begin();
+            fail("Expected IllegalStateException on second begin()");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction already 
started"));
+        }
+    }
+
+    /**
+     * #13: Calling commit() when not open throws IllegalStateException.
+     */
+    @Test
+    public void shouldThrowOnCommitWhenNotOpen_13() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        assertFalse(tx.isOpen());
+
+        try {
+            tx.commit();
+            fail("Expected IllegalStateException on commit when not open");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    /**
+     * #14: Calling rollback() when not open throws IllegalStateException.
+     */
+    @Test
+    public void shouldThrowOnRollbackWhenNotOpen_14() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        assertFalse(tx.isOpen());
+
+        try {
+            tx.rollback();
+            fail("Expected IllegalStateException on rollback when not open");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    /**
+     * #15: getTransactionId() returns null on a freshly created, 
not-yet-begun transaction,
+     * and returns a non-null, non-blank value after begin().
+     */
+    @Test
+    public void shouldReturnNullTransactionIdBeforeBegin_15() throws Exception 
{
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+
+        // before begin, transactionId should be null
+        assertNull(tx.getTransactionId());
+
+        tx.begin();
+        // after begin, transactionId should be non-null and non-blank
+        assertNotNull(tx.getTransactionId());
+        assertFalse(tx.getTransactionId().isBlank());
+    }
+
+    /**
+     * #16: Transaction.close() triggers the closeConsumer (default: commit).
+     */
+    @Test
+    public void shouldCommitOnCloseByDefault_16() throws Exception {
+        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx1.begin();
+        tx1.submit("g.addV('person').property('name','close_test')");
+        // close() should trigger default COMMIT behavior
+        tx1.close();
+        assertFalse(tx1.isOpen());
+
+        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx2.begin();
+        assertEquals(1L, 
tx2.submit("g.V().hasLabel('person').count()").one().getLong());
+    }
+
+    /**
+     * #17: Transaction.onClose(ROLLBACK) then close() triggers rollback 
instead.
+     */
+    @Test
+    public void shouldRollbackOnCloseWhenConfigured_17() throws Exception {
+        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx1.onClose(Transaction.CLOSE_BEHAVIOR.ROLLBACK);
+        tx1.begin();
+        tx1.submit("g.addV('person').property('name','rollback_close_test')");
+        tx1.close();
+        assertFalse(tx1.isOpen());
+
+        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx2.begin();
+        assertEquals(0L, 
tx2.submit("g.V().hasLabel('person').count()").one().getLong());
+    }
+
+    /**
+     * #18: Cluster.close() rolls back all open transactions (best-effort).
+     */
+    @Test
+    public void shouldRollbackOpenTransactionsOnClusterClose_18() throws 
Exception {
+        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx1.begin();
+        tx1.submit("g.addV('cluster-close')");
+        tx1.submit("g.addV('cluster-close')");
+
+        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx2.begin();
+        tx2.submit("g.addV('cluster-close')");
+
+        // close cluster without committing — should rollback open transactions
+        cluster.close();
+        cluster = null;
+
+        // reconnect and verify data was not persisted
+        final Cluster cluster2 = TestClientFactory.open();
+        try {
+            final HttpRemoteTransaction cluster2tx1 = cluster2.transact(GTX);
+            assertEquals(0L, 
cluster2tx1.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong());
+        } finally {
+            cluster2.close();
+        }
+    }
+
+    /**
+     * #22: Server enforces maxConcurrentTransactions — returns error when 
limit exceeded.
+     * Configured with maxConcurrentTransactions=1.
+     */
+    @Test
+    public void shouldEnforceMaxConcurrentTransactions_22() throws Exception {
+        // first transaction fills the single slot
+        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx1.begin();
+
+        // second transaction should fail
+        try {
+            final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+            tx2.begin();
+            fail("Expected exception when max concurrent transactions 
exceeded");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Maximum concurrent 
transactions exceeded"));
+        }
+    }
+
+    /**
+     * #23: Server-side transaction timeout — idle transaction auto-rolls back 
after configured timeout.
+     * Configured with transactionTimeout=1000.
+     */
+    @Test
+    public void shouldTimeoutIdleTransaction_23() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('timeout_test')");
+
+        // wait for the transaction to timeout
+        Thread.sleep(2000);
+
+        // the transaction was rolled back server-side; attempting to commit 
should fail
+        try {
+            tx.commit();
+            fail("Expected exception on commit after server-side timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('timeout_test').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #30: Idle transaction with no operations times out.
+     * Configured with transactionTimeout=1000.
+     */
+    @Test
+    public void shouldTimeoutIdleTransactionWithNoOperations_30() throws 
Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+
+        // wait for the transaction to timeout
+        Thread.sleep(2000);
+
+        // attempting to commit should fail because the server rolled back
+        try {
+            tx.commit();
+            fail("Expected exception on commit after server-side timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+    }
+
+    /**
+     * #31: Concurrent transaction timeout independence — keep one active, 
leave the other idle.
+     * Only the idle one should timeout. Configured with 
transactionTimeout=2000.
+     */
+    @Test
+    public void shouldTimeoutOnlyIdleTransactionNotActiveOne_31() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction txActive = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        txActive.begin();
+        txActive.submit("g.addV('active')");
+
+        final HttpRemoteTransaction txIdle = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        txIdle.begin();
+        txIdle.submit("g.addV('idle')");
+
+        // keep the active transaction alive by sending requests at intervals 
shorter than timeout
+        for (int i = 0; i < 3; i++) {
+            Thread.sleep(800);
+            txActive.submit("g.V().count()");
+        }
+
+        // by now the idle transaction should have timed out (2000ms elapsed)
+        // the active transaction should still be alive
+        txActive.commit();
+
+        // verify active transaction's data persisted
+        assertEquals(1L, 
client.submit("g.V().hasLabel('active').count()").all().get().get(0).getLong());
+
+        // idle transaction should have been rolled back by timeout
+        try {
+            txIdle.commit();
+            fail("Expected exception on commit of timed-out idle transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify idle transaction's data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('idle').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #32: Late commit after timeout — timeout fires and rolls back, then 
client sends commit.
+     * Configured with transactionTimeout=1000.
+     */
+    @Test
+    public void shouldRejectLateCommitAfterTimeout_32() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('person').property('name','late_commit')");
+
+        // wait for timeout
+        Thread.sleep(2000);
+
+        // attempt commit — should fail because server already rolled back
+        try {
+            tx.commit();
+            fail("Expected exception on late commit after timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #34: Timeout frees a slot under maxConcurrentTransactions.
+     * Configured with maxConcurrentTransactions=1 and transactionTimeout=1000.
+     */
+    // TODO: get rid of this? it doesn't even have a driver side assert
+//    @Test
+//    public void shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34() 
throws Exception {
+//        // fill the single slot
+//        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx1.begin();
+//
+//        // wait for timeout to reclaim the slot
+//        Thread.sleep(2000);
+//
+//        // now a new transaction should succeed
+//        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx2.begin();
+//        assertTrue(tx2.isOpen());
+//        tx2.rollback();
+//    }
+
+    /**
+     * #37: Commit queued before timeout wins — set a timeout, begin tx, add 
data,
+     * and immediately commit. The commit should succeed if it arrives before 
the timeout fires.
+     * Configured with transactionTimeout=3000 to avoid flakiness.
+     */
+    // TODO: this just seems like a bad way to test this, consider removing or 
improving
+//    @Test
+//    public void shouldCommitBeforeTimeoutWins_37() throws Exception {
+//        final Client client = cluster.connect();
+//        final RequestOptions gtxOptions = 
RequestOptions.build().addG(GTX).create();
+//
+//        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx.begin();
+//
+//        tx.submit("g.addV('person').property('name','fast_commit')");
+//
+//        // commit immediately — should succeed before timeout fires
+//        tx.commit();
+//
+//        assertFalse(tx.isOpen());
+//        assertEquals(1L, client.submit("g.V().hasLabel('person').count()", 
gtxOptions)
+//                .all().get().get(0).getLong());
+//    }
+
+    /**
+     * #39: Client disappears without commit or rollback — server-side 
transaction eventually
+     * times out and rolls back. Configured with transactionTimeout=1000.
+     */
+    // TODO: consider transfering this to "server test"
+//    @Test
+//    public void shouldTimeoutAbandonedTransaction_39() throws Exception {
+//        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx.begin();
+//        tx.submit("g.addV('person').property('name','abandoned')");
+//
+//        // close cluster without committing
+//        cluster.close();
+//        cluster = null;
+//
+//        // wait for server-side timeout
+//        Thread.sleep(2000);
+//
+//        // reconnect and verify data was not persisted
+//        final Cluster cluster2 = TestClientFactory.open();
+//        try {
+//            final Client client2 = cluster2.connect();
+//            assertEquals(0L, 
client2.submit("g.V().hasLabel('person').count()",
+//                    
RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong());
+//        } finally {
+//            cluster2.close();
+//        }
+//    }
+
+    /**
+     * #48: Multiple concurrent transactions (different transaction IDs) don't 
interfere.
+     */
+    @Test
+    public void shouldIsolateConcurrentTransactions_48() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx1.begin();
+        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx2.begin();
+
+        tx1.submit("g.addV('tx1')");
+        tx2.submit("g.addV('tx2')");
+
+        // tx1 should not see tx2's data and vice versa
+        assertEquals(0L, 
tx1.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong());
+        assertEquals(0L, 
tx2.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong());
+
+        tx1.commit();
+        tx2.commit();
+
+        // both should be visible after commit
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #49: Open and close many transactions sequentially (stress test for 
resource cleanup).
+     */
+    @Test
+    public void shouldOpenAndCloseManyTransactionsSequentially_49() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+        final long numberOfTransactions = 50;
+
+        for (int i = 0; i < numberOfTransactions; i++) {
+            final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+            tx.begin();
+            tx.submit("g.addV('stress')");
+            tx.commit();
+        }
+
+        final long count = 
client.submit("g.V().hasLabel('stress').count()").all().get().get(0).getLong();
+        assertEquals(numberOfTransactions, count);
+
+        Thread.sleep(100);
+        // this should be 0, but to prevent flakiness, make it a reasonable 
number less than numberOfTransactions
+        
assertTrue(server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount()
 < 35);
+    }
+
+    /**
+     * #50: Reuse transaction across threads — synchronized submitInternal 
serializes correctly.
+     */
+    // TODO: consider removing as this isn't an advertised functionality. its 
not advertised to be thread-safe.
+//    @Test
+//    public void shouldSerializeSubmitsAcrossThreads_50() throws Exception {
+//        final Client client = cluster.connect();
+//        final RequestOptions gtxOptions = 
RequestOptions.build().addG(GTX).create();
+//
+//        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx.begin();
+//
+//        final int threadCount = 8;
+//        final CountDownLatch latch = new CountDownLatch(threadCount);
+//        final AtomicBoolean failed = new AtomicBoolean(false);
+//
+//        for (int i = 0; i < threadCount; i++) {
+//            final int idx = i;
+//            new Thread(() -> {
+//                try {
+//                    tx.submit("g.addV('threaded').property('idx'," + idx + 
")");
+//                } catch (Exception ex) {
+//                    logger.error("Thread {} failed", idx, ex);
+//                    failed.set(true);
+//                } finally {
+//                    latch.countDown();
+//                }
+//            }).start();
+//        }
+//
+//        assertTrue("Threads did not complete in time", latch.await(30, 
TimeUnit.SECONDS));
+//        assertFalse("One or more threads failed", failed.get());
+//
+//        tx.commit();
+//
+//        assertEquals((long) threadCount,
+//                client.submit("g.V().hasLabel('threaded').count()", 
gtxOptions)
+//                        .all().get().get(0).getLong());
+//    }
+
+    /**
+     * #52: Interleaved transactional and non-transactional requests on the 
same Cluster.
+     */
+    // TODO: continue here
+    @Test
+    public void shouldIsolateTransactionalAndNonTransactionalRequests_52() 
throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('tx_data')");
+
+        // non-transactional read should not see uncommitted tx data
+        assertEquals(0L, 
client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong());
+
+        tx.commit();
+
+        // now the data should be visible
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #53: Begin on a graph that doesn't support transactions fails at begin 
time.
+     */
+    @Test
+    public void shouldRejectBeginOnNonTransactionalGraph_53() throws Exception 
{
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact("gclassic");
+        try {
+            tx.begin();
+            fail("Expected exception when beginning transaction on 
non-transactional graph");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transactions not 
supported"));
+        }
+    }
+
+    /**
+     * #54: Transaction targets the correct graph — vertex committed on gtx 
should not
+     * appear in gclassic.
+     */
+    @Test
+    public void shouldTargetCorrectGraph_54() throws Exception {
+        final Client client = cluster.connect();
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV('routed')");
+        tx.commit();
+
+        // vertex should exist in the transactional graph (gtx)
+        assertEquals(1L, client.submit("g.V().hasLabel('routed').count()",
+                
RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong());
+
+        // vertex should NOT exist in the classic graph (gclassic)
+        assertEquals(0L, client.submit("g.V().hasLabel('routed').count()",
+                
RequestOptions.build().addG("gclassic").create()).all().get().get(0).getLong());
+    }
+
+    /**
+     * #56: Non-transactional write is auto-committed.
+     */
+    @Test
+    public void shouldAutoCommitNonTransactionalWrite_56() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        client.submit("g.addV('auto')").all().get();
+
+        assertEquals(1L, 
cluster.transact(GTX).submit("g.V().hasLabel('auto').count()").one().getLong());
+    }
+
+    /**
+     * #57: Non-transactional write is not affected by an open transaction on 
the same graph.
+     */
+    @Test
+    public void shouldNotAffectNonTxWriteWhenTxRolledBack_57() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        // add vertex A in the transaction (don't commit)
+        tx.submit("g.addV('tx_vertex')");
+
+        // add vertex B non-transactionally
+        client.submit("g.addV('nontx_vertex')").all().get();
+
+        // vertex B should be immediately visible
+        assertEquals(1L, 
client.submit("g.V().hasLabel('nontx_vertex').count()")
+                .all().get().get(0).getLong());
+
+        // rollback the transaction
+        tx.rollback();
+
+        // vertex A should be gone, vertex B should persist
+        assertEquals(0L, 
client.submit("g.V().hasLabel('tx_vertex').count()").one().getLong());
+        assertEquals(1L, 
client.submit("g.V().hasLabel('nontx_vertex').count()").one().getLong());
+    }
+
+    // TODO: Add test for submit(String, RequestOptions) — e.g. pass custom 
RequestOptions with parameters
+    //       into a transaction and verify they are forwarded correctly 
alongside the transaction context.
+
+    // TODO: Add test for submit(String, Map<String, Object>) — e.g. pass a 
parameterized traversal like
+    //       "g.addV(label).property('name',name)" with a parameter map and 
verify it works within a transaction.
+
+    /**
+     * #58: Cluster.transact() returns an HttpRemoteTransaction, calling 
begin() opens it,
+     * submit() sends traversals, commit() finalizes.
+     */
+    // TODO: definitely get rid of this one right? its pretty much tested by 
10 other tests.
+//    @Test
+//    public void shouldWorkWithExplicitTransactionObject_58() throws 
Exception {
+//        final Client client = cluster.connect().alias(GTX);
+//
+//        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx.begin();
+//
+//        tx.submit("g.addV('explicit_tx').property('name','test')");
+//        tx.commit();
+//
+//        assertEquals(1L, 
client.submit("g.V().hasLabel('explicit_tx').count()", gtxOptions)
+//                .all().get().get(0).getLong());
+//    }
+
+    /**
+     * Combined test for #60, #61: Submit g.tx().commit() and 
g.tx().rollback() as
+     * gremlin-lang strings within a transaction via the HttpRemoteTransaction 
API.
+     */
+    // TODO: move this to the Client test file because it can't be done with 
HttpRemoteTransaction
+//    @Test
+//    public void shouldSupportScriptBasedTransactionControl_60_61() throws 
Exception {
+//        final Client client = cluster.connect().alias(GTX);
+//
+//        // #60: commit
+//        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx1.begin();
+//        tx1.submit("g.addV('script_commit')");
+//        tx1.commit();
+//
+//        assertEquals(1L, 
client.submit("g.V().hasLabel('script_commit').count()", gtxOptions)
+//                .all().get().get(0).getLong());
+//
+//        // #61: rollback
+//        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx2.begin();
+//        tx2.submit("g.addV('script_rollback')");
+//        tx2.rollback();
+//
+//        assertEquals(0L, 
client.submit("g.V().hasLabel('script_rollback').count()", gtxOptions)
+//                .all().get().get(0).getLong());
+//    }
+
+    /**
+     * #28: extractTransactionId() rejects empty/blank transaction IDs.
+     * <p>
+     * Verified indirectly: begin on a non-transactional graph fails 
server-side before
+     * a transactionId is ever assigned. The client cleans up properly.
+     */
+    // TODO: best tested with a mock server. look into updating the 
ClientBehavioralTests with this.
+//    @Test
+//    public void shouldRejectEmptyTransactionId_28() throws Exception {
+//        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact("gclassic");
+//
+//        try {
+//            tx.begin();
+//            fail("Expected exception when beginning on non-transactional 
graph");
+//        } catch (Exception ex) {
+//            // expected — server rejects the begin
+//        }
+//
+//        // transaction should be cleaned up
+//        assertFalse(tx.isOpen());
+//        assertNull(tx.getTransactionId());
+//    }
+
+    /**
+     * #29: If begin() fails, the client cleans up via closeInternal() — 
transaction moves
+     * to CLOSED state, isOpen() is false, getTransactionId() is null, and a 
second begin()
+     * throws because the state is no longer NOT_STARTED.
+     */
+    @Test
+    public void shouldCleanUpOnBeginFailure_29() throws Exception {
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact("gclassic");
+
+        try {
+            tx.begin();
+            fail("Expected exception on begin for non-transactional graph");
+        } catch (RuntimeException ex) {
+            assertThat(ex.getMessage(), containsString("Failed to begin 
transaction"));
+        }
+
+        // verify cleanup: transaction is closed and has no ID
+        assertFalse(tx.isOpen());
+        assertNull(tx.getTransactionId());
+
+        // second begin should fail — state moved to CLOSED, not back to 
NOT_STARTED
+        try {
+            tx.begin();
+            fail("Expected IllegalStateException on begin after failed begin");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    /**
+     * #62: If a traversal within a transaction throws an error, the 
transaction remains
+     * open for further operations or explicit rollback.
+     */
+    @Test
+    public void shouldKeepTransactionOpenAfterTraversalError_62() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('good_vertex')");
+
+        // submit a bad traversal that should fail
+        try {
+            tx.submit("g.V().fail()");
+        } catch (Exception ex) {
+            // expected error from bad traversal
+        }
+
+        // transaction should still be open — rollback should work
+        assertTrue(tx.isOpen());
+        tx.rollback();
+
+        assertFalse(tx.isOpen());
+        assertEquals(0L, 
client.submit("g.V().hasLabel('good_vertex').count()").all().get().get(0).getLong());
+    }
+
+    /**
+     * #33: Transaction count tracking — open N transactions, commit some, 
rollback others,
+     * let one timeout. After all are resolved, verify the active count is 
zero.
+     * Configured with transactionTimeout=2000.
+     */
+    // TODO: consider removing as this is server-test. make sure its covered 
in other test file then delete.
+//    @Test
+//    public void shouldTrackTransactionCountAccurately_33() throws Exception {
+//        final TransactionManager txManager = 
server.getServerGremlinExecutor().getTransactionManager();
+//
+//        assertEquals(0, txManager.getActiveTransactionCount());
+//
+//        // open 3 transactions
+//        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx1.begin(GraphTraversalSource.class);
+//
+//        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx2.begin(GraphTraversalSource.class);
+//
+//        final HttpRemoteTransaction tx3 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx3.begin(GraphTraversalSource.class);
+//
+//        assertEquals(3, txManager.getActiveTransactionCount());
+//
+//        // commit one
+//        tx1.commit();
+//        assertEquals(2, txManager.getActiveTransactionCount());
+//
+//        // rollback one
+//        tx2.rollback();
+//        assertEquals(1, txManager.getActiveTransactionCount());
+//
+//        // let the third one timeout (configured at 2000ms)
+//        Thread.sleep(3000);
+//        assertEquals(0, txManager.getActiveTransactionCount());
+//    }
+
+    /**
+     * #35: Server shutdown with multiple open transactions — open several 
transactions with
+     * uncommitted data, trigger server shutdown. Verify uncommitted data is 
not persisted.
+     */
+    // TODO: like above, this is probably server-side test only, make sure it 
exists then delete.
+//    @Test
+//    public void shouldRollbackAllOnServerShutdown_35() throws Exception {
+//        final TransactionManager txManager = 
server.getServerGremlinExecutor().getTransactionManager();
+//
+//        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx1.begin(GraphTraversalSource.class);
+//        tx1.submit("g.addV('shutdown_test_1')");
+//
+//        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx2.begin(GraphTraversalSource.class);
+//        tx2.submit("g.addV('shutdown_test_2')");
+//
+//        final HttpRemoteTransaction tx3 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx3.begin(GraphTraversalSource.class);
+//        tx3.submit("g.addV('shutdown_test_3')");
+//
+//        assertEquals(3, txManager.getActiveTransactionCount());
+//
+//        cluster.close();
+//        cluster = null;
+//
+//        // shut down the server — TransactionManager.shutdown() should 
rollback all
+//        stopServer();
+//
+//        // restart the server
+//        startServer();
+//
+//        // reconnect and verify no data was persisted
+//        final Cluster cluster2 = TestClientFactory.open();
+//        try {
+//            final Client client2 = cluster2.connect();
+//
+//
+//            assertEquals(0L, 
client2.submit("g.V().hasLabel('shutdown_test_1').count()", gtxOptions)
+//                    .all().get().get(0).getLong());
+//            assertEquals(0L, 
client2.submit("g.V().hasLabel('shutdown_test_2').count()", gtxOptions)
+//                    .all().get().get(0).getLong());
+//            assertEquals(0L, 
client2.submit("g.V().hasLabel('shutdown_test_3').count()", gtxOptions)
+//                    .all().get().get(0).getLong());
+//
+//            assertEquals(0, 
server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount());
+//        } finally {
+//            cluster2.close();
+//        }
+//    }
+
+    /**
+     * #36: Server shutdown completes within bounded time.
+     * Configured with perGraphCloseTimeout=2000.
+     */
+    // Again, server test, don't need it here, make sure it exists in 
server-test.
+//    @Test
+//    public void shouldShutdownWithinBoundedTime_36() throws Exception {
+//        final HttpRemoteTransaction tx1 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx1.begin(GraphTraversalSource.class);
+//        tx1.submit("g.addV('shutdown_timing')");
+//
+//        final HttpRemoteTransaction tx2 = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//        tx2.begin(GraphTraversalSource.class);
+//        tx2.submit("g.addV('shutdown_timing')");
+//
+//        cluster.close();
+//        cluster = null;
+//
+//        final long start = System.currentTimeMillis();
+//        stopServer();
+//        final long elapsed = System.currentTimeMillis() - start;
+//
+//        assertTrue("Shutdown took too long: " + elapsed + "ms", elapsed < 
60000);
+//
+//        // restart for tearDown
+//        startServer();
+//    }
+
+    /**
+     * #40: perGraphCloseTimeout bounds shutdown duration — verify shutdown 
does not hang
+     * beyond the configured timeout even with open transactions.
+     * Configured with perGraphCloseTimeout=2000.
+     */
+    // TODO: this probably should just get deleted. its too hard to test and 
not that valuable.
+//    @Test
+//    public void shouldBoundShutdownDurationWithOpenTransactions_40() throws 
Exception {
+//        final TransactionManager txManager = 
server.getServerGremlinExecutor().getTransactionManager();
+//
+//        for (int i = 0; i < 5; i++) {
+//            final HttpRemoteTransaction tx = (HttpRemoteTransaction) 
cluster.transact(GTX);
+//            tx.begin();
+//            tx.submit("g.addV('timeout_bound_" + i + "')");
+//        }
+//
+//        assertEquals(5, txManager.getActiveTransactionCount());
+//
+//        cluster.close();
+//        cluster = null;
+//
+//        final long start = System.currentTimeMillis();
+//        stopServer();
+//        final long elapsed = System.currentTimeMillis() - start;
+//
+//        assertTrue("Shutdown took too long: " + elapsed + "ms", elapsed < 
60000);
+//
+//        // restart for tearDown
+//        startServer();
+//    }
+}

Reply via email to