This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-tx-client
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 495156538bd8d98a0184c3f09886589e3035ae6f
Author: Ken Hu <[email protected]>
AuthorDate: Mon Mar 16 17:08:46 2026 -0700

    server implementation of transactions
---
 .../apache/tinkerpop/gremlin/server/Context.java   |  11 +
 .../tinkerpop/gremlin/server/GremlinServer.java    |   5 +
 .../apache/tinkerpop/gremlin/server/Settings.java  |  18 +
 .../gremlin/server/channel/HttpChannelizer.java    |   4 +-
 .../server/handler/HttpGremlinEndpointHandler.java | 165 ++++-
 .../gremlin/server/handler/HttpHandlerUtil.java    |  37 +-
 .../server/handler/HttpRequestMessageDecoder.java  |   3 +
 .../gremlin/server/handler/TransactionManager.java | 179 +++++
 .../server/handler/UnmanagedTransaction.java       | 180 +++++
 .../gremlin/server/util/GremlinError.java          |  78 +++
 .../gremlin/server/util/ServerGremlinExecutor.java |  13 +
 .../GremlinServerHttpTransactionIntegrateTest.java | 764 +++++++++++++++++++++
 .../org/apache/tinkerpop/gremlin/util/Tokens.java  |   7 +
 .../gremlin/util/message/RequestMessage.java       |  15 +
 .../ser/AbstractGraphSONMessageSerializerV4.java   |   3 +
 .../util/ser/binary/RequestMessageSerializer.java  |   3 +
 16 files changed, 1457 insertions(+), 28 deletions(-)

diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 21b7c0b636..06e2507e75 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -56,6 +56,7 @@ public class Context {
     private ScheduledFuture<?> timeoutExecutor = null;
     private boolean timeoutExecutorGrabbed = false;
     private final Object timeoutExecutorLock = new Object();
+    private String transactionId; // initially null for non-transactional 
requests and begin() calls; set after transaction creation.
 
     public Context(final RequestMessage requestMessage, final 
ChannelHandlerContext ctx,
                    final Settings settings, final GraphManager graphManager,
@@ -80,6 +81,7 @@ public class Context {
         this.requestState = requestState;
         this.requestTimeout = determineTimeout();
         this.materializeProperties = determineMaterializeProperties();
+        this.transactionId = 
requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
     }
 
     public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) {
@@ -119,6 +121,15 @@ public class Context {
         return scheduledExecutorService;
     }
 
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(final String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+
     /**
      * Gets the current request to Gremlin Server.
      */
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 1314af6225..16d8a2d045 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -293,6 +293,11 @@ public class GremlinServer {
                 logger.warn("Timeout waiting for boss/worker thread pools to 
shutdown - continuing with shutdown process.");
             }
 
+            if (serverGremlinExecutor != null) {
+                logger.info("Shutting down TransactionManager");
+                serverGremlinExecutor.getTransactionManager().shutdown();
+            }
+
             // close TraversalSource and Graph instances - there aren't 
guarantees that closing Graph will close all
             // spawned TraversalSource instances so both should be closed 
directly and independently.
             if (serverGremlinExecutor != null) {
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 5e56c76ed9..3489bd511e 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -184,6 +184,24 @@ public class Settings {
      */
     public boolean strictTransactionManagement = false;
 
+    /**
+     * Time in milliseconds that a transaction can remain idle before it is 
automatically rolled back.
+     * This prevents resource leaks from abandoned transactions. Default is 
600000 (10 minutes).
+     */
+    public long transactionTimeout = 600000L;
+
+    /**
+     * Time in milliseconds to wait for a transaction commit or rollback 
operation to complete.
+     * Default is 10000 (10 seconds).
+     */
+    public long perGraphCloseTimeout = 10000L;
+
+    /**
+     * Maximum number of concurrent transactions allowed on the server.
+     * Default is 1000.
+     */
+    public int maxConcurrentTransactions = 1000;
+
     /**
      * The full class name of the {@link Channelizer} to use in Gremlin Server.
      */
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index e90ae340bf..e5643f5154 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -35,6 +35,7 @@ import 
org.apache.tinkerpop.gremlin.server.handler.HttpRequestIdHandler;
 import org.apache.tinkerpop.gremlin.server.handler.HttpRequestMessageDecoder;
 import org.apache.tinkerpop.gremlin.server.handler.HttpUserAgentHandler;
 import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
@@ -61,7 +62,8 @@ public class HttpChannelizer extends AbstractChannelizer {
     @Override
     public void init(final ServerGremlinExecutor serverGremlinExecutor) {
         super.init(serverGremlinExecutor);
-        httpGremlinEndpointHandler = new 
HttpGremlinEndpointHandler(gremlinExecutor, graphManager, settings);
+        httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(
+                gremlinExecutor, graphManager, settings, 
serverGremlinExecutor.getTransactionManager());
     }
 
     @Override
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index cbc9f2d616..86a9b3943b 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -27,9 +27,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
@@ -57,6 +55,7 @@ import 
org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
 import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
 import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
@@ -82,6 +81,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -92,6 +92,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import static com.codahale.metrics.MetricRegistry.name;
@@ -107,7 +108,8 @@ import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHan
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING;
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED;
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders;
+import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse;
+import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent;
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError;
 
 /**
@@ -168,13 +170,16 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
     private final GremlinExecutor gremlinExecutor;
     private final GraphManager graphManager;
     private final Settings settings;
+    private final TransactionManager transactionManager;
 
     public HttpGremlinEndpointHandler(final GremlinExecutor gremlinExecutor,
                                       final GraphManager graphManager,
-                                      final Settings settings) {
+                                      final Settings settings,
+                                      final TransactionManager 
transactionManager) {
         this.gremlinExecutor = gremlinExecutor;
         this.graphManager = graphManager;
         this.settings = settings;
+        this.transactionManager = transactionManager;
     }
 
     @Override
@@ -210,18 +215,36 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                             requestMessage.getGremlin());
                 }
 
-                // Send back the 200 OK response header here since the 
response is always chunk transfer encoded. Any
-                // failures that follow this will show up in the response body 
instead.
-                final HttpResponse responseHeader = new 
DefaultHttpResponse(HTTP_1_1, OK);
-                if 
(acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING)))
 {
-                    responseHeader.headers().add(CONTENT_ENCODING, DEFLATE);
+                // These guards prevent any obvious failures from returning 
200 OK early by detecting them here and
+                // throwing before any other processing starts so the user 
gets a better error code.
+                final String txId = requestCtx.getTransactionId();
+                final String gremlin = requestMessage.getGremlin();
+                if (isTransactionBegin(gremlin)) {
+                    // If this is a begin transaction request then we need to 
create the Transaction ID first since the
+                    // dual-transmission expectation means the response header 
below should contain it.
+
+                    // This prevents accidentally re-opening the underlying 
transaction.
+                    if (txId != null) throw new 
ProcessingException(GremlinError.beginHasTransactionId());
+
+                    handleBegin(requestCtx);
+                } else if (txId != null) {
+                    // This check makes sure that the underlying Graph is 
already open to stop a closed transaction
+                    // from re-opening due to the default autostart nature of 
transactions. This occurs in cases where a
+                    // transactional traversal is submitted after a 
commit/rollback.
+                    final Graph g = 
graphManager.getTraversalSource(requestMessage.getField(Tokens.ARGS_G)).getGraph();
+                    if ((!g.tx().isOpen())) {
+                        throw new 
ProcessingException(GremlinError.transactionNotFound(txId));
+                    }
+                } else if ((txId == null) && (isTransactionCommit(gremlin) || 
isTransactionRollback(gremlin))) {
+                    // Logically, commit/rollback should only be allowed on a 
transactional request.
+                    throw new 
ProcessingException(GremlinError.transactionalControlRequiresTransaction());
                 }
-                responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
-                responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, 
serializer.getValue0());
-                ctx.writeAndFlush(responseHeader);
-                ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
 
-                iterateScriptEvalResult(requestCtx, serializer.getValue1(), 
requestMessage);
+                // Send back the 200 OK response header here since the 
response is always chunk transfer encoded. Any
+                // failures that follow this will show up in the response body 
instead.
+                sendHttpResponse(ctx, OK, createResponseHeaders(ctx, 
serializer, requestCtx).toArray(CharSequence[]::new));
+                sendHttpContents(ctx, requestCtx);
+                sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
             } catch (Throwable t) {
                 writeError(requestCtx, formErrorResponseMessage(t, 
requestMessage), serializer.getValue1());
             } finally {
@@ -240,7 +263,9 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         });
 
         try {
-            final Future<?> executionFuture = 
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
+            final Future<?> executionFuture = (requestCtx.getTransactionId() 
!= null) ?
+                    
transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) :
+                    
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
             if (seto > 0) {
                 // Schedule a timeout in the thread pool for future execution
                 
requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(()
 -> {
@@ -252,6 +277,47 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
             }
         } catch (RejectedExecutionException ree) {
             writeError(requestCtx, GremlinError.rateLimiting(), 
serializer.getValue1());
+        } catch (NoSuchElementException nsee) {
+            writeError(requestCtx, 
GremlinError.transactionNotFound(requestCtx.getTransactionId()), 
serializer.getValue1());
+        }
+    }
+
+    private List<CharSequence> createResponseHeaders(final 
ChannelHandlerContext ctx,
+                                                     final Pair<String, 
MessageSerializer<?>> serializer,
+                                                     final Context requestCtx) 
{
+        final List<CharSequence> headers = new ArrayList<>();
+        headers.add(HttpHeaderNames.CONTENT_TYPE);
+        headers.add(serializer.getValue0());
+        if 
(acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING)))
 {
+            headers.add(CONTENT_ENCODING);
+            headers.add(DEFLATE);
+        }
+        if (requestCtx.getTransactionId() != null) {
+            headers.add("X-Transaction-Id");
+            headers.add(requestCtx.getTransactionId());
+        }
+        return headers;
+    }
+
+    private void sendHttpContents(final ChannelHandlerContext ctx, final 
Context requestContext) throws Exception {
+        final Pair<String, MessageSerializer<?>> serializer = 
ctx.channel().attr(StateKey.SERIALIZER).get();
+        final RequestMessage request = requestContext.getRequestMessage();
+        final String txId = requestContext.getTransactionId();
+        final Optional<UnmanagedTransaction> transaction = 
transactionManager.get(txId);
+        final Graph graph = 
graphManager.getTraversalSource(request.getField(Tokens.ARGS_G)).getGraph();
+
+        // Early guard against fake or incorrect transaction IDs.
+        if ((txId != null) && transaction.isEmpty()) throw new 
ProcessingException(GremlinError.transactionNotFound(txId));
+
+        if (isTransactionBegin(request.getGremlin())) {
+            runBegin(requestContext, transaction.get(), serializer);
+        } else if (isTransactionCommit(request.getGremlin())) {
+            handleGraphOp(requestContext, txId, graph, Transaction::commit, 
serializer);
+        } else if 
(isTransactionRollback(requestContext.getRequestMessage().getGremlin())) {
+            handleGraphOp(requestContext, txId, graph, Transaction::rollback, 
serializer);
+        } else {
+            // Both transactional and non-transactional traversals follow this 
path for response chunking.
+            iterateScriptEvalResult(requestContext, serializer.getValue1(), 
request);
         }
     }
 
@@ -372,6 +438,70 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         }
     }
 
+    /**
+     * Detects if the gremlin script is a transaction begin command.
+     */
+    private boolean isTransactionBegin(final String gremlin) {
+        if (gremlin == null) return false;
+        return gremlin.trim().equalsIgnoreCase("g.tx().begin()");
+    }
+
+    /**
+     * Detects if the gremlin script is a transaction commit command.
+     */
+    private boolean isTransactionCommit(final String gremlin) {
+        if (gremlin == null) return false;
+        return gremlin.trim().equalsIgnoreCase("g.tx().commit()");
+    }
+
+    /**
+     * Detects if the gremlin script is a transaction rollback command.
+     */
+    private boolean isTransactionRollback(final String gremlin) {
+        if (gremlin == null) return false;
+        return gremlin.trim().equalsIgnoreCase("g.tx().rollback()");
+    }
+
+    /**
+     * Handle begin by creating an {@link UnmanagedTransaction} and submitting 
the open to its executor.
+     */
+    private void handleBegin(final Context ctx) throws Exception {
+        final String traversalSourceName = 
ctx.getRequestMessage().getField(Tokens.ARGS_G);
+
+        final UnmanagedTransaction txCtx;
+        try {
+            txCtx = transactionManager.create(traversalSourceName);
+            ctx.setTransactionId(txCtx.getTransactionId());
+            final Graph graph = 
graphManager.getTraversalSource(traversalSourceName).getGraph();
+            txCtx.submit(new FutureTask<>(() -> {
+                graph.tx().open();
+                return null;
+            })).get(); // TODO: institute timeout
+        } catch (IllegalStateException ise) {
+            throw new 
ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage()));
+        } catch (IllegalArgumentException iae) {
+            throw new 
ProcessingException(GremlinError.binding(traversalSourceName));
+        } catch (UnsupportedOperationException uoe) {
+            throw new 
ProcessingException(GremlinError.transactionNotSupported(uoe));
+        }
+    }
+
+    private void runBegin(final Context ctx, UnmanagedTransaction tx, final 
Pair<String, MessageSerializer<?>> serializer) throws Exception {
+        final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), 
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, 
false);
+        ctx.getChannelHandlerContext().writeAndFlush(new 
DefaultHttpContent(chunk));
+    }
+
+    private void handleGraphOp(final Context ctx,
+                               final String transactionId,
+                               final Graph graph,
+                               final Consumer<Transaction> graphOp,
+                               final Pair<String, MessageSerializer<?>> 
serializer) throws Exception {
+        graphOp.accept(graph.tx());
+        transactionManager.destroy(transactionId);
+        final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), 
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false);
+        ctx.getChannelHandlerContext().writeAndFlush(new 
DefaultHttpContent(chunk));
+    }
+
     private Bindings mergeBindingsFromRequest(final Context ctx, final 
Bindings bindings) throws ProcessingException {
         // alias any global bindings to a different variable.
         final RequestMessage msg = ctx.getRequestMessage();
@@ -431,7 +561,6 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                 // it needs to be released here
                 if (chunk != null) chunk.release();
             }
-            sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
             return;
         }
 
@@ -518,10 +647,6 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                     }
 
                     nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
-
-                    if (!hasMore) {
-                        sendTrailingHeaders(nettyContext, 
HttpResponseStatus.OK, "");
-                    }
                 }
             } else {
                 final long currentTime = System.currentTimeMillis();
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index 1e84e35dde..782be1100c 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -97,6 +97,8 @@ public class HttpHandlerUtil {
      * @param serializer        The serializer to use to serialize the error 
response.
      */
     static void writeError(final Context context, final ResponseMessage 
responseMessage, final MessageSerializer<?> serializer) {
+        logger.debug("Regular path writing error: {}", responseMessage);
+
         try {
             final ChannelHandlerContext ctx = 
context.getChannelHandlerContext();
             final ByteBuf ByteBuf = context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.STREAMING
@@ -106,16 +108,14 @@ public class HttpHandlerUtil {
             
context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);
 
             if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) {
-                final HttpResponse responseHeader = new 
DefaultHttpResponse(HTTP_1_1, responseMessage.getStatus().getCode());
-                responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); // 
Set this to make it "keep alive" eligible.
-                responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, 
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
-                ctx.writeAndFlush(responseHeader);
-                ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
+                sendHttpResponse(ctx,
+                        responseMessage.getStatus().getCode(),
+                        HttpHeaderNames.CONTENT_TYPE, 
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
             }
 
             ctx.writeAndFlush(new DefaultHttpContent(ByteBuf));
 
-            sendTrailingHeaders(ctx, responseMessage.getStatus().getCode(), 
responseMessage.getStatus().getException());
+            sendLastHttpContent(ctx, responseMessage.getStatus().getCode(), 
responseMessage.getStatus().getException());
         } catch (SerializationException se) {
             logger.warn("Unable to serialize ResponseMessage: {} ", 
responseMessage);
         }
@@ -147,7 +147,8 @@ public class HttpHandlerUtil {
      * @param statusCode    The status code to include in the trailers.
      * @param exceptionType The type of exception to include in the trailers. 
Leave blank or null if no error occurred.
      */
-    static void sendTrailingHeaders(final ChannelHandlerContext ctx, final 
HttpResponseStatus statusCode, final String exceptionType) {
+    static void sendLastHttpContent(final ChannelHandlerContext ctx, final 
HttpResponseStatus statusCode, final String exceptionType) {
+        // TODO: this might be not sent if exception occurs early so HTTP not 
properly terminated
         final DefaultLastHttpContent defaultLastHttpContent = new 
DefaultLastHttpContent();
         defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE, 
statusCode.code());
         if (exceptionType != null && !exceptionType.isEmpty()) {
@@ -156,4 +157,26 @@ public class HttpHandlerUtil {
 
         ctx.writeAndFlush(defaultLastHttpContent);
     }
+
+    /**
+     * Sends the initial HTTP response header with the given status and 
optional header pairs.
+     * Also marks the channel as having sent a response. Headers must be 
provided as alternating
+     * name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}).
+     *
+     * @param ctx     The netty channel context.
+     * @param status  The HTTP status code for the response.
+     * @param headers Alternating header name/value pairs to set on the 
response.
+     * @throws IllegalArgumentException if headers length is not even
+     */
+    static void sendHttpResponse(final ChannelHandlerContext ctx, final 
HttpResponseStatus status, final CharSequence... headers) {
+        if ((headers.length%2) != 0) throw new 
IllegalArgumentException("Headers should come in pairs.");
+
+        final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, 
status);
+        responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
+        for (int i=0; i<headers.length; i+=2) {
+            responseHeader.headers().set(headers[i], headers[i+1]);
+        }
+        ctx.writeAndFlush(responseHeader);
+        ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
+    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
index a220b40106..fc8552a883 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
@@ -205,6 +205,9 @@ public class HttpRequestMessageDecoder extends 
MessageToMessageDecoder<FullHttpR
         final JsonNode matPropsNode = 
body.get(Tokens.ARGS_MATERIALIZE_PROPERTIES);
         if (null != matPropsNode) 
builder.addMaterializeProperties(matPropsNode.asText());
 
+        final JsonNode txIdNode = body.get(Tokens.ARGS_TRANSACTION_ID);
+        if (null != txIdNode) builder.addTransactionId(txIdNode.asText());
+
         return builder.create();
     }
 
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
new file mode 100644
index 0000000000..f1e63ac7fc
--- /dev/null
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Manages active HTTP transactions on the server.
+ * <p>
+ * This class is responsible for:
+ * <ul>
+ *   <li>Creating new transaction contexts when clients begin transactions</li>
+ *   <li>Looking up existing transactions by ID</li>
+ *   <li>Enforcing maximum concurrent transaction limits</li>
+ *   <li>Graceful shutdown of all active transactions</li>
+ * </ul>
+ */
+public class TransactionManager {
+    private static final Logger logger = 
LoggerFactory.getLogger(TransactionManager.class);
+
+    private final ConcurrentMap<String, UnmanagedTransaction> transactions = 
new ConcurrentHashMap<>();
+
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final GraphManager graphManager;
+    private final long transactionTimeoutMs;
+    private final int maxConcurrentTransactions;
+
+    /**
+     * Creates a new TransactionManager with the specified configuration.
+     *
+     * @param scheduledExecutorService Scheduler for timeout management
+     * @param graphManager The graph manager for accessing traversal sources
+     * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback
+     * @param maxConcurrentTransactions Maximum number of concurrent 
transactions allowed
+     */
+    public TransactionManager(final ScheduledExecutorService 
scheduledExecutorService,
+                              final GraphManager graphManager,
+                              final long transactionTimeoutMs,
+                              final int maxConcurrentTransactions) {
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.graphManager = graphManager;
+        this.transactionTimeoutMs = transactionTimeoutMs;
+        this.maxConcurrentTransactions = maxConcurrentTransactions;
+
+        logger.info("TransactionManager initialized with timeout={}ms, 
maxTransactions={}",
+                transactionTimeoutMs, maxConcurrentTransactions);
+    }
+
+    /**
+     * Creates a new {@link UnmanagedTransaction} for the specified traversal 
source.
+     *
+     * @param traversalSourceName The traversal source alias (e.g., "g")
+     * @return The new {@link UnmanagedTransaction}, ready for task submission
+     * @throws IllegalStateException if max transactions exceeded
+     * @throws IllegalArgumentException if traversal source not found
+     * @throws UnsupportedOperationException if the graph does not support 
transactions
+     */
+    public UnmanagedTransaction create(final String traversalSourceName) {
+        if (transactions.size() >= maxConcurrentTransactions) {
+            throw new IllegalStateException(
+                "Maximum concurrent transactions exceeded (" + 
maxConcurrentTransactions + ")");
+        }
+
+        final TraversalSource ts = 
graphManager.getTraversalSource(traversalSourceName);
+        if (ts == null) {
+            throw new IllegalArgumentException("Traversal source not found: " 
+ traversalSourceName);
+        } else if (!ts.getGraph().features().graph().supportsTransactions()) {
+            throw Graph.Exceptions.transactionsNotSupported();
+        }
+
+        final UnmanagedTransaction txCtx = 
createTransactionContext(ts.getGraph());
+        logger.debug("Transaction {} created for source {}", 
txCtx.getTransactionId(), traversalSourceName);
+        return txCtx;
+    }
+
+    /**
+     * Removes a transaction from the active transactions map. Called when a 
transaction is
+     * committed, rolled back, or otherwise closed.
+     *
+     * @param id The transaction ID to remove
+     */
+    public void destroy(final String id) {
+        transactions.remove(id);
+    }
+
+    /**
+     * Creates a unique transaction ID, retrying on the unlikely UUID 
collision. The newly created
+     * {@link UnmanagedTransaction} is inserted into the transactions map.
+     */
+    private UnmanagedTransaction createTransactionContext(final Graph graph) {
+        String txId;
+        UnmanagedTransaction ctx;
+
+        do {
+            txId = UUID.randomUUID().toString();
+            ctx = new UnmanagedTransaction(
+                    txId,
+                    this,
+                    graph,
+                    scheduledExecutorService,
+                    transactionTimeoutMs
+            );
+        } while (transactions.putIfAbsent(txId, ctx) != null);
+
+        return ctx;
+    }
+
+    /**
+     * Gets an existing {@link UnmanagedTransaction} by ID.
+     *
+     * @param transactionId The transaction ID to look up
+     * @return Optional containing the {@link UnmanagedTransaction} if found, 
empty otherwise
+     */
+    public Optional<UnmanagedTransaction> get(final String transactionId) {
+        if (null == transactionId) return Optional.empty(); // Prevent NPE 
from calling get(null) on ConcurrentHashMap
+        return Optional.ofNullable(transactions.get(transactionId));
+    }
+
+    /**
+     * Returns the number of currently active transactions.
+     *
+     * @return the count of active transactions
+     */
+    public int getActiveTransactionCount() {
+        return transactions.size();
+    }
+
+    /**
+     * Shuts down the transaction manager, rolling back all active 
transactions.
+     * <p>
+     * This method should be called during server shutdown to ensure all 
transactions
+     * are properly cleaned up. It blocks until all rollbacks complete.
+     */
+    public void shutdown() {
+        final int activeCount = transactions.size();
+        logger.info("Shutting down TransactionManager with {} active 
transactions", activeCount);
+
+        if (activeCount == 0) return;
+
+        // Roll back all active transactions
+        transactions.values().forEach(transaction -> {
+            try {
+                transaction.close(false);
+            } catch (Exception e) {
+                logger.warn("Error rolling back transaction {} during 
shutdown: {}",
+                    transaction.getTransactionId(), e.getMessage());
+            }
+        });
+
+        transactions.clear();
+        logger.info("TransactionManager shutdown complete");
+    }
+}
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
new file mode 100644
index 0000000000..aa8fa4a7a0
--- /dev/null
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Maintains state for an active transaction over HTTP.
+ * <p>
+ * Key design principle: Graph transactions are ThreadLocal-bound, so all 
operations
+ * for a transaction must execute on the same thread. This is achieved via a
+ * single-threaded executor. Callers submit {@link FutureTask} instances that 
contain
+ * the complete request lifecycle (graph operation, error handling, response 
writing),
+ * following the same pattern as the non-transactional HTTP path and the legacy
+ * {@code SessionOpProcessor}.
+ */
+public class UnmanagedTransaction {
+    private static final Logger logger = 
LoggerFactory.getLogger(UnmanagedTransaction.class);
+
+    private final String transactionId;
+    private final TransactionManager manager;
+    private final Graph graph;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final long timeout;
+    private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new 
AtomicReference<>();
+
+    // Controls whether the executor is still accepting tasks.
+    private final AtomicBoolean accepting = new AtomicBoolean(true);
+    /**
+     * Single-threaded executor ensures all operations for this transaction 
run on
+     * the same thread, preserving the ThreadLocal nature of Graph 
transactions.
+     */
+    private final ExecutorService executor;
+
+    /**
+     * Creates a new {@code UnmanagedTransaction} for managing an HTTP 
transaction.
+     *
+     * @param transactionId The unique identifier for this transaction
+     * @param transactionManager The manager that owns this transaction's 
lifecycle
+     * @param graph The graph instance for this transaction
+     * @param scheduledExecutorService Scheduler for timeout management
+     * @param transactionTimeout Timeout in milliseconds before auto-rollback
+     */
+    public UnmanagedTransaction(final String transactionId,
+                                final TransactionManager transactionManager,
+                                final Graph graph,
+                                final ScheduledExecutorService 
scheduledExecutorService,
+                                final long transactionTimeout) {
+        logger.debug("New transaction context established for {}", 
transactionId);
+        this.transactionId = transactionId;
+        this.manager = transactionManager;
+        this.graph = graph;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.timeout = transactionTimeout;
+
+        // Create single-threaded executor with named thread for debugging
+        this.executor = Executors.newSingleThreadExecutor(
+            r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, 
transactionId.length()))));
+    }
+
+    /**
+     * Returns the transaction ID.
+     */
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Resets the timeout for this transaction. Called on each request.
+     */
+    public void touch() {
+        timeoutFuture.updateAndGet(future -> {
+            if (future != null) future.cancel(false);
+            return scheduledExecutorService.schedule(() -> {
+                logger.info("Transaction {} timed out after {} ms of 
inactivity", transactionId, timeout);
+                close(false);
+            }, timeout, TimeUnit.MILLISECONDS);
+        });
+    }
+
+    /**
+     * Opens the underlying graph transaction and starts the inactivity 
timeout.
+     * Should be called on the transaction's single-threaded executor to 
preserve
+     * ThreadLocal affinity. On failure the exception is re-thrown and the 
caller
+     * is responsible for cleanup (e.g. via {@link #close(boolean)}).
+     */
+    public void open() {
+        try {
+            graph.tx().open();
+            touch();
+            logger.debug("Transaction {} opened", transactionId);
+        } catch (Exception e) {
+            logger.warn("Failed to begin transaction {}: {}", transactionId, 
e.getMessage());
+            throw e;
+        }
+    }
+
+    /**
+     * Closes this transaction and releases its resources. When {@code force} 
is {@code false},
+     * any open graph transaction is rolled back before shutdown. When {@code 
force} is {@code true},
+     * the executor is shut down immediately without attempting a rollback.
+     *
+     * @param force if {@code true}, skip the rollback attempt and shut down 
immediately
+     */
+    public synchronized void close(boolean force) {
+        accepting.set(false);
+
+        // if the transaction has already been removed then there's no need to 
do this process again. it's possible
+        // for this to be called at roughly the same time. this prevents 
close() from being called more than once.
+        if (manager.get(transactionId).isEmpty()) return;
+
+        if (!force) {
+            // when not "forced", an open transaction should be rolled back
+            try {
+                executor.submit(() -> {
+                    if (graph.tx().isOpen()) {
+                        logger.debug("Rolling back open transaction on {}", 
transactionId);
+                        graph.tx().rollback();
+                    }
+                }).get(1000, TimeUnit.MILLISECONDS); // TODO: wire this to 
configuredPerGraphCloseTimeout
+            } catch (Exception ex) {
+                logger.warn(String.format("An error occurred while attempting 
rollback on %s ", transactionId), ex);
+            }
+        }
+
+        // prevent any additional requests from processing. if the kill was 
not "forced" then jobs were scheduled to
+        // try to rollback open transactions. those jobs either timed-out or 
completed successfully. either way, no
+        // additional jobs will be allowed, running jobs will be cancelled (if 
possible) and any scheduled jobs will
+        // be cancelled
+        executor.shutdownNow();
+        manager.destroy(transactionId);
+        Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> 
f.cancel(true));
+        logger.debug("Transaction {} closed", transactionId);
+    }
+
+    /**
+     * Submits a task to be executed within this transaction's thread context.
+     * The task should contain the complete request lifecycle: graph operation,
+     * error handling, and response writing.
+     *
+     * @param task The FutureTask to execute on the transaction thread
+     * @return Future that can be used for timeout cancellation
+     * @throws IllegalStateException if the transaction is closed
+     */
+    public Future<?> submit(final FutureTask<Void> task) {
+        if (!accepting.get()) throw new IllegalStateException("Transaction " + 
transactionId + " is closed");
+
+        touch();
+        return executor.submit(task);
+    }
+}
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
index c6bbef3ae9..e65364e505 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
@@ -144,4 +144,82 @@ public class GremlinError {
         final String message = (t.getMessage() == null) ? t.toString() : 
t.getMessage();
         return new GremlinError(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
message, "ServerErrorException");
     }
+
+    /**
+     * Creates an error for when a transaction is not found on the server.
+     * This typically occurs when:
+     * <ul>
+     *   <li>The transaction ID was never registered (client didn't call 
begin)</li>
+     *   <li>The transaction timed out and was automatically rolled back</li>
+     *   <li>The transaction was already committed or rolled back</li>
+     * </ul>
+     *
+     * @param transactionId The transaction ID that was not found
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError transactionNotFound(final String transactionId) 
{
+        final String message = String.format(
+            "Transaction not found: %s. The transaction may have timed out, 
already been committed/rolled back, " +
+            "or was never started. Call g.tx().begin() to start a new 
transaction.", transactionId);
+        return new GremlinError(HttpResponseStatus.NOT_FOUND, message, 
"TransactionException");
+    }
+
+    /**
+     * Creates an error for when commit or rollback is sent without a 
transaction ID.
+     *
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError transactionalControlRequiresTransaction() {
+        final String message = "g.tx().commit() and g.tx().rollback() are only 
allowed in transactional requests.";
+        return new GremlinError(HttpResponseStatus.BAD_REQUEST, message, 
"TransactionException");
+    }
+
+    /**
+     * Creates an error for when a begin request is sent with a user-supplied 
transaction ID.
+     * The server generates transaction IDs; clients should not provide them 
on begin.
+     *
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError beginHasTransactionId() {
+        final String message = "Begin transaction request cannot have a 
user-supplied transactionId";
+        return new GremlinError(HttpResponseStatus.BAD_REQUEST, message, 
"TransactionException");
+    }
+
+    /**
+     * Creates an error for when the maximum number of concurrent transactions 
is exceeded.
+     *
+     * @param exceededErrorMessage The error message containing a maximum 
number of concurrent transactions
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError maxTransactionsExceeded(String 
exceededErrorMessage) {
+        final String message = exceededErrorMessage +
+                " The server has reached its transaction limit. " +
+                "Please wait for existing transactions to complete or increase 
the server's maxConcurrentTransactions setting.";
+        return new GremlinError(HttpResponseStatus.SERVICE_UNAVAILABLE, 
message, "TransactionException");
+    }
+
+    /**
+     * Creates an error for when a transaction operation times out.
+     *
+     * @param transactionId The transaction ID that timed out
+     * @param operation The operation that timed out (e.g., "commit", 
"rollback", "execute")
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError transactionTimeout(final String transactionId, 
final String operation) {
+        final String message = String.format(
+            "Transaction %s timed out during %s operation. The transaction has 
been rolled back. " +
+            "Consider increasing the transaction timeout or breaking the 
operation into smaller parts.",
+            transactionId, operation);
+        return new GremlinError(HttpResponseStatus.GATEWAY_TIMEOUT, message, 
"TransactionException");
+    }
+
+    /**
+     * Creates an error for when the requested graph does not support 
transactions.
+     *
+     * @param uoe The exception stating that transactions aren't supported
+     * @return A GremlinError with appropriate message and status code
+     */
+    public static GremlinError transactionNotSupported(final 
UnsupportedOperationException uoe) {
+        return new GremlinError(HttpResponseStatus.BAD_REQUEST, 
uoe.getMessage(), "TransactionException");
+    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 97a2c8ee4a..3f29a8cad0 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.server.Channelizer;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class ServerGremlinExecutor {
     private static final Logger logger = 
LoggerFactory.getLogger(ServerGremlinExecutor.class);
 
     private final GraphManager graphManager;
+    private final TransactionManager transactionManager;
     private final Settings settings;
     private final List<LifeCycleHook> hooks;
 
@@ -197,6 +199,13 @@ public class ServerGremlinExecutor {
                 .filter(kv -> kv.getValue() instanceof LifeCycleHook)
                 .map(kv -> (LifeCycleHook) kv.getValue())
                 .collect(Collectors.toList());
+
+        transactionManager = new TransactionManager(
+                scheduledExecutorService,
+                graphManager,
+                settings.transactionTimeout,
+                settings.maxConcurrentTransactions
+        );
     }
 
     private void registerMetrics(final String engineName) {
@@ -236,6 +245,10 @@ public class ServerGremlinExecutor {
         return graphManager;
     }
 
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
     public Settings getSettings() {
         return settings;
     }
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
new file mode 100644
index 0000000000..42be6f23c2
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.http.Consts;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_DATA;
+import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_RESULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Server-side integration tests for HTTP transaction protocol.
+ * <p>
+ * These tests bypass the driver entirely and use a raw Apache HTTP client to 
hit the server's HTTP endpoint directly.
+ * This validates that the server returns the correct status codes and error 
messages independent of any client-side
+ * guards.
+ */
+public class GremlinServerHttpTransactionIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+    private final String GTX = "gtx";
+    private final ObjectMapper mapper = new ObjectMapper();
+    private CloseableHttpClient client;
+
+    @Before
+    public void createHttpClient() {
+        client = HttpClients.createDefault();
+    }
+
+    @After
+    public void closeHttpClient() throws Exception {
+        client.close();
+    }
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        final String nameOfTest = name.getMethodName();
+        switch (nameOfTest) {
+            case "shouldRejectRequestWhenMaxConcurrentTransactionsExceeded_22":
+                settings.maxConcurrentTransactions = 1;
+                break;
+            case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34":
+                settings.maxConcurrentTransactions = 1;
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldTimeoutIdleTransactionWithNoOperations_30":
+                settings.transactionTimeout = 1;
+                break;
+            case "shouldTimeoutAndRejectLateCommit_32":
+                settings.transactionTimeout = 1000;
+                break;
+        }
+        return settings;
+    }
+
+    /**
+     * Sends a JSON POST request and returns the response. Caller must close 
the response.
+     */
+    private CloseableHttpResponse postJson(final CloseableHttpClient client, 
final String json) throws Exception {
+        final HttpPost post = new 
HttpPost(TestClientFactory.createURLString());
+        post.addHeader("Content-Type", "application/json");
+        post.setEntity(new StringEntity(json, Consts.UTF_8));
+        return client.execute(post);
+    }
+
+    /**
+     * Sends a begin transaction request for the given graph alias and returns 
the server-generated transaction ID.
+     */
+    private String beginTx(final CloseableHttpClient client, final String 
graphAlias) throws Exception {
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + graphAlias + 
"\"}")) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+            final String txIdHeader = 
response.getFirstHeader("X-Transaction-Id").getValue();
+            final String json = EntityUtils.toString(response.getEntity());
+            final JsonNode node = mapper.readTree(json);
+            final String txIdBody = node.get(TOKEN_RESULT).
+                    get(TOKEN_DATA).
+                    get(GraphSONTokens.VALUEPROP).get(0).
+                    get(GraphSONTokens.VALUEPROP).get(1).
+                    asText();
+            assertNotNull(txIdHeader);
+            assertEquals(txIdHeader, txIdBody);
+            return txIdHeader;
+        }
+    }
+
+    /**
+     * Sends a traversal within an existing transaction.
+     */
+    private CloseableHttpResponse submitInTx(final CloseableHttpClient client,
+                                             final String txId,
+                                             final String gremlin,
+                                             final String graphAlias)
+            throws Exception {
+        return postJson(client,
+                "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias + 
"\",\"transactionId\":\"" + txId + "\"}");
+    }
+
+    /**
+     * Sends a commit for an existing transaction.
+     */
+    private CloseableHttpResponse commitTx(final CloseableHttpClient client,
+                                           final String txId, final String 
graphAlias) throws Exception {
+        return postJson(client,
+                "{\"gremlin\":\"g.tx().commit()\",\"g\":\"" + graphAlias + 
"\",\"transactionId\":\"" + txId + "\"}");
+    }
+
+    /**
+     * Sends a rollback for an existing transaction.
+     */
+    private CloseableHttpResponse rollbackTx(final CloseableHttpClient client,
+                                             final String txId, final String 
graphAlias) throws Exception {
+        return postJson(client,
+                "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"" + graphAlias + 
"\",\"transactionId\":\"" + txId + "\"}");
+    }
+
+    /**
+     * Sends a non-transactional traversal (no transactionId).
+     */
+    private CloseableHttpResponse submitNonTx(final CloseableHttpClient client,
+                                              final String gremlin, final 
String graphAlias) throws Exception {
+        return postJson(client,
+                "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias + 
"\"}");
+    }
+
+    /**
+     * Extracts the integer count from a typical count() response.
+     */
+    private int extractCount(final CloseableHttpResponse response) throws 
Exception {
+        final String json = EntityUtils.toString(response.getEntity());
+        final JsonNode node = mapper.readTree(json);
+        return node.get("result").get(TOKEN_DATA)
+                .get(GraphSONTokens.VALUEPROP).get(0)
+                .get(GraphSONTokens.VALUEPROP).intValue();
+    }
+
+    /**
+     * Extracts the status message from the response body's status field.
+     */
+    private String extractStatusMessage(final CloseableHttpResponse response) 
throws Exception {
+        final String json = EntityUtils.toString(response.getEntity());
+        final JsonNode node = mapper.readTree(json);
+        return node.get("status").get("message").asText();
+    }
+
+    @Test
+    public void shouldNotBeginTransactionWithUserProvidedId() throws Exception 
{
+        final String txId = beginTx(client, GTX);
+
+        try (final CloseableHttpResponse r = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + 
"\",\"transactionId\":\"" + txId + "\"}")) {
+            // Depending on whether the transaction is still open on the 
server when the second request arrives, there
+            // may be two different errors that the server throws.
+            assertTrue(r.getStatusLine().getStatusCode() == 404 || 
r.getStatusLine().getStatusCode() == 400);
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Begin transaction request cannot have a 
user-supplied transactionId") ||
+                    msg.contains("Transaction not found"));
+        }
+    }
+
+    /**
+     * Combined test for #10 and #13: After commit, submitting a traversal or 
another commit
+     * with the committed transaction's ID returns 404 "Transaction not found".
+     */
+    @Test
+    public void shouldReturn404ForInvalidCommit_10_13() throws Exception {
+        // Can't commit on non-existent transaction.
+        try (final CloseableHttpResponse r = commitTx(client, "fakeId", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+
+        final String txId = beginTx(client, GTX);
+
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV('test')", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+        try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // #10: submit traversal on committed tx -> 404
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+
+        // #13: commit again on committed tx -> 404
+        try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+    }
+
+    /**
+     * Combined test for #11 and #14: After rollback, submitting a traversal 
or another rollback
+     * with the rolled-back transaction's ID returns 404 "Transaction not 
found".
+     */
+    @Test
+    public void shouldReturn404ForInvalidRollback_11_14() throws Exception {
+        // Can't rollback a non-existent transaction.
+        try (final CloseableHttpResponse r = rollbackTx(client, "fakeId", 
GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+
+        final String txId = beginTx(client, GTX);
+
+        // add a vertex and rollback
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV('test')", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+        try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // #11: submit traversal on rolled-back tx -> 404
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", GTX)) {
+            final String msg = extractStatusMessage(r);
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            assertTrue(msg.contains("Transaction not found"));
+        }
+
+        // #14: rollback again on rolled-back tx -> 404
+        try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+    }
+
+    /**
+     * Combined test for #20 and #27: Server returns a transaction ID in 
response to begin,
+     * and two separate begins produce distinct IDs.
+     */
+    @Test
+    public void shouldReturnValidTransactionId_20_27() throws Exception {
+        // #20: begin returns a valid transaction ID
+        final String txId1 = beginTx(client, GTX);
+        assertNotNull(txId1);
+        assertFalse(txId1.isBlank());
+
+        // #27: second begin returns a different ID
+        final String txId2 = beginTx(client, GTX);
+        assertNotNull(txId2);
+        assertFalse(txId2.isBlank());
+        assertNotEquals(txId1, txId2);
+    }
+
+    /**
+     * #21: Server returns "Transaction not found" for a fabricated/invalid 
transaction ID.
+     */
+    @Test
+    public void shouldReturn404ForInvalidTransactionId_21() throws Exception {
+        final String fakeTxId = UUID.randomUUID().toString();
+
+        try (final CloseableHttpResponse r = submitInTx(client, fakeTxId, 
"g.V().count()", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+    }
+
+    /**
+     * #22: Server enforces maxConcurrentTransactions and returns error when 
limit exceeded.
+     * Configured with maxConcurrentTransactions=1.
+     */
+    @Test
+    public void shouldRejectRequestWhenMaxConcurrentTransactionsExceeded_22() 
throws Exception {
+        // open one transaction (fills the limit)
+        beginTx(client, GTX);
+
+        // try to open another -- should fail with 503 (SERVICE_UNAVAILABLE)
+        try (final CloseableHttpResponse r = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + "\"}")) {
+            assertEquals(503, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Maximum concurrent transactions 
exceeded"));
+        }
+    }
+
+    /**
+     * #30: Idle transaction with no operations times out and is removed.
+     */
+    @Test
+    public void shouldTimeoutIdleTransactionWithNoOperations_30() throws 
Exception {
+        final String txId = beginTx(client, GTX);
+
+        // wait for the transaction to timeout (configured at 1ms)
+        Thread.sleep(1000);
+
+        // the transaction should be gone
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+    }
+
+    /**
+     * #32: Late commit after timeout -- timeout fires and rolls back, then 
client sends commit.
+     * Server should return 404 and data should not persist.
+     */
+    @Test
+    public void shouldTimeoutAndRejectLateCommit_32() throws Exception {
+        final String txId = beginTx(client, GTX);
+
+        // add a vertex
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV('timeout_test')", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // wait for timeout (configured at 1000ms)
+        Thread.sleep(2000);
+
+        // attempt commit -- should fail with 404
+        try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+            final String msg = extractStatusMessage(r);
+            assertTrue(msg.contains("Transaction not found"));
+        }
+
+        // verify data was not persisted
+        try (final CloseableHttpResponse r = submitNonTx(client, 
"g.V().hasLabel('timeout_test').count()", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            assertEquals(0, extractCount(r));
+        }
+    }
+
+    /**
+     * #34: Timeout frees a slot under maxConcurrentTransactions.
+     * Configured with maxConcurrentTransactions=1 and transactionTimeout=1000.
+     */
+    @Test
+    public void shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34() 
throws Exception {
+        // fill the single slot
+        beginTx(client, GTX);
+
+        // wait for timeout to reclaim the slot
+        Thread.sleep(2000);
+
+        // now a new transaction should succeed
+        final String txId = beginTx(client, GTX);
+        assertNotNull(txId);
+        assertFalse(txId.isBlank());
+    }
+
+    /**
+     * #38: Repeated operations on a closed transaction -- after commit, 
subsequent commit,
+     * rollback, and traversal all fail with 404. Same for after rollback.
+     */
+    @Test
+    public void shouldReturn404ForAllOperationsOnClosedTransaction_38() throws 
Exception {
+        // --- After commit ---
+        final String txId1 = beginTx(client, GTX);
+        try (final CloseableHttpResponse r = submitInTx(client, txId1, 
"g.addV('test38')", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+        try (final CloseableHttpResponse r = commitTx(client, txId1, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // rollback-after-commit
+        try (final CloseableHttpResponse r = rollbackTx(client, txId1, GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+        }
+        // traversal-after-commit
+        try (final CloseableHttpResponse r = submitInTx(client, txId1, 
"g.V().count()", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+        }
+
+        // --- After rollback ---
+        final String txId2 = beginTx(client, GTX);
+        try (final CloseableHttpResponse r = submitInTx(client, txId2, 
"g.addV('test38b')", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+        try (final CloseableHttpResponse r = rollbackTx(client, txId2, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // commit-after-rollback
+        try (final CloseableHttpResponse r = commitTx(client, txId2, GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+        }
+        // traversal-after-rollback
+        try (final CloseableHttpResponse r = submitInTx(client, txId2, 
"g.V().count()", GTX)) {
+            assertEquals(404, r.getStatusLine().getStatusCode());
+        }
+    }
+
+    /**
+     * 38a: Traversal queued behind commit on server executor -- from two 
threads, simultaneously
+     * send a commit and a traversal for the same transaction. After the 
commit executes, the
+     * queued traversal must fail with 404 and no leaked data should exist.
+     */
+    @Test
+    public void shouldNotLeakDataWhenTraversalQueuedBehindCommit_38a() throws 
Exception {
+        final String txId = beginTx(client, GTX);
+
+        // add vertices and an edge in the transaction
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 1)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 2)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+        }
+
+        // Fire three requests concurrently: a long traversal to occupy the 
server executor, then a commit that queues
+        // behind it, then a short query that queues behind the commit. The 
short query should fail with 404 because the
+        // commit closes the transaction first.
+        final ExecutorService executor = Executors.newFixedThreadPool(3);
+        try {
+            final Future<CloseableHttpResponse> longFuture = 
executor.submit(() ->
+                    submitInTx(client, txId, 
"g.V().repeat(both()).times(1000)", GTX));
+            Thread.sleep(50);
+
+            final Future<CloseableHttpResponse> commitFuture = 
executor.submit(() ->
+                    commitTx(client, txId, GTX));
+            Thread.sleep(50);
+
+            final Future<CloseableHttpResponse> shortFuture = 
executor.submit(() ->
+                    submitInTx(client, txId, "g.V().count()", GTX));
+
+            // collect responses
+            try (final CloseableHttpResponse ignored = longFuture.get(30, 
TimeUnit.SECONDS)) {
+                // it doesn't matter what the long traversal returns, only 
that it ran
+            }
+            try (final CloseableHttpResponse commitResp = commitFuture.get(30, 
TimeUnit.SECONDS)) {
+                assertEquals(200, commitResp.getStatusLine().getStatusCode());
+            }
+            try (final CloseableHttpResponse shortResp = shortFuture.get(30, 
TimeUnit.SECONDS)) {
+                assertEquals(404, shortResp.getStatusLine().getStatusCode());
+                final String msg = extractStatusMessage(shortResp);
+                assertTrue(msg.contains("Transaction not found"));
+            }
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    /**
+     * #44: The g alias is locked per transaction -- sending a request with a 
valid transaction ID
+     * but a different g alias should be rejected with 400.
+     */
+    @Test
+    public void shouldRejectMismatchedGraphAliasInTransaction_44() throws 
Exception {
+        final String txId = beginTx(client, GTX);
+
+        // send a request with the same txId but a different graph alias
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", "gclassic")) {
+            final int status = r.getStatusLine().getStatusCode();
+            assertTrue("Expected error status for alias mismatch, got " + 
status,
+                    status == 400 || status == 404 || status == 500);
+        }
+    }
+
+    /**
+     * #46: Graph alias is present on the begin request. Also tests that begin 
with no alias
+     * or an invalid alias returns an error.
+     */
+    @Test
+    public void shouldRequireGraphAliasOnBeginRequest_46() throws Exception {
+        // begin with no g alias -- should default to "g" which doesn't 
support transactions
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\"}")) {
+            final int status = response.getStatusLine().getStatusCode();
+            assertEquals(400, status);
+            assertTrue(extractStatusMessage(response).contains("Graph does not 
support transactions"));
+        }
+
+        // begin with an invalid alias -- should fail
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\",\"g\":\"nonexistent\"}")) {
+            final int status = response.getStatusLine().getStatusCode();
+            assertEquals(400, status);
+            assertTrue(extractStatusMessage(response).contains("Could not 
alias"));
+        }
+
+        // begin with valid alias -- should succeed (positive case)
+        final String txId = beginTx(client, GTX);
+        assertNotNull(txId);
+        assertFalse(txId.isBlank());
+    }
+
+    /**
+     * #47: Commit and rollback requests must carry the transaction ID. 
Sending commit/rollback
+     * without a transaction ID should fail. Sending with a valid ID should 
succeed.
+     */
+    @Test
+    public void shouldRequireTransactionIdOnCommitAndRollback_47() throws 
Exception {
+        // commit with no transactionId -- this is just "g.tx().commit()" with 
no txId,
+        // which the server treats as a begin (since there's no txId). But the 
gremlin is
+        // "g.tx().commit()" not "g.tx().begin()", so the server should route 
to commit
+        // handling which requires a txId. The exact behavior depends on the 
routing logic.
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().commit()\",\"g\":\"gtx\"}")) {
+            // without a transactionId, this should not succeed as a commit
+            final int status = response.getStatusLine().getStatusCode();
+            // the server may treat this as a non-transactional request or 
reject it
+            // either way it should not be a successful commit of a transaction
+            assertEquals(400, status);
+            assertTrue(extractStatusMessage(response).contains("only allowed 
in transactional requests"));
+        }
+
+        // rollback with no transactionId -- same logic
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"gtx\"}")) {
+            final int status = response.getStatusLine().getStatusCode();
+            assertEquals(400, status);
+            assertTrue(extractStatusMessage(response).contains("only allowed 
in transactional requests"));
+        }
+    }
+
+    /**
+     * #53: Begin on a graph that doesn't support transactions fails at begin 
time.
+     */
+    @Test
+    public void shouldRejectBeginOnNonTransactionalGraph_53() throws Exception 
{
+        // gclassic is backed by TinkerGraph (non-transactional)
+        try (final CloseableHttpResponse response = postJson(client,
+                "{\"gremlin\":\"g.tx().begin()\",\"g\":\"gclassic\"}")) {
+            final int status = response.getStatusLine().getStatusCode();
+            assertTrue("Expected error for non-transactional graph, got " + 
status,
+                    status == 400 || status == 500);
+        }
+    }
+
+    /**
+     * Combined test for #66 and #67: Full begin/addV/commit flow using 
GraphBinary and GraphSON
+     * serializers respectively, verifying the transactionId field round-trips 
correctly.
+     */
+    @Test
+    public void shouldRoundTripTransactionIdWithGraphBinary_66() throws 
Exception {
+        final GraphBinaryMessageSerializerV4 serializer = new 
GraphBinaryMessageSerializerV4();
+
+        // begin via GraphBinary
+        final ByteBuf beginReq = serializer.serializeRequestAsBinary(
+                RequestMessage.build("g.tx().begin()").addG(GTX).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost beginPost = new 
HttpPost(TestClientFactory.createURLString());
+        beginPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHBINARY_V4.getValue());
+        beginPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHBINARY_V4.getValue());
+        beginPost.setEntity(new ByteArrayEntity(beginReq.array()));
+
+        String txId;
+        try (final CloseableHttpResponse response = client.execute(beginPost)) 
{
+            assertEquals(200, response.getStatusLine().getStatusCode());
+            final ResponseMessage rm = 
serializer.readChunk(toByteBuf(response.getEntity()), true);
+            final List<?> data = (List<?>) rm.getResult().getData();
+            assertNotNull(data);
+            assertTrue(data.size() > 0);
+            // the data should contain a map with transactionId
+            final Map<?, ?> map = (java.util.Map<?, ?>) data.get(0);
+            txId = (String) map.get(Tokens.ARGS_TRANSACTION_ID);
+            assertNotNull(txId);
+            assertFalse(txId.isBlank());
+        }
+
+        // addV via GraphBinary
+        final ByteBuf addVReq = serializer.serializeRequestAsBinary(
+                
RequestMessage.build("g.addV('binary_test')").addG(GTX).addTransactionId(txId).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost addVPost = new 
HttpPost(TestClientFactory.createURLString());
+        addVPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHBINARY_V4.getValue());
+        addVPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHBINARY_V4.getValue());
+        addVPost.setEntity(new ByteArrayEntity(addVReq.array()));
+        try (final CloseableHttpResponse response = client.execute(addVPost)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+        }
+
+        // commit via GraphBinary
+        final ByteBuf commitReq = serializer.serializeRequestAsBinary(
+                
RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost commitPost = new 
HttpPost(TestClientFactory.createURLString());
+        commitPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHBINARY_V4.getValue());
+        commitPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHBINARY_V4.getValue());
+        commitPost.setEntity(new ByteArrayEntity(commitReq.array()));
+        try (final CloseableHttpResponse response = 
client.execute(commitPost)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+        }
+
+        // verify data persisted
+        try (final CloseableHttpResponse r = submitNonTx(client, 
"g.V().hasLabel('binary_test').count()", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            assertEquals(1, extractCount(r));
+        }
+    }
+
+    private static ByteBuf toByteBuf(final org.apache.http.HttpEntity 
httpEntity) throws java.io.IOException {
+        final byte[] asArray = EntityUtils.toByteArray(httpEntity);
+        return Unpooled.wrappedBuffer(asArray);
+    }
+    @Test
+    public void shouldRoundTripTransactionIdWithGraphSON_67() throws Exception 
{
+        final GraphSONMessageSerializerV4 serializer = new 
GraphSONMessageSerializerV4();
+
+        // begin via GraphSON
+        final ByteBuf beginReq = serializer.serializeRequestAsBinary(
+                RequestMessage.build("g.tx().begin()").addG(GTX).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost beginPost = new 
HttpPost(TestClientFactory.createURLString());
+        beginPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHSON_V4.getValue());
+        beginPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHSON_V4.getValue());
+        beginPost.setEntity(new ByteArrayEntity(beginReq.array()));
+
+        String txId;
+        try (final CloseableHttpResponse response = client.execute(beginPost)) 
{
+            assertEquals(200, response.getStatusLine().getStatusCode());
+            assertEquals(Serializers.GRAPHSON_V4.getValue(), 
response.getEntity().getContentType().getValue());
+            final String json = EntityUtils.toString(response.getEntity());
+            final JsonNode node = mapper.readTree(json);
+            txId = node.get("result").get(TOKEN_DATA)
+                    .get(GraphSONTokens.VALUEPROP).get(0)
+                    .get(GraphSONTokens.VALUEPROP).get(1)
+                    .asText();
+
+            assertNotNull(txId);
+            assertFalse(txId.isBlank());
+        }
+
+        // addV via GraphSON
+        final ByteBuf addVReq = serializer.serializeRequestAsBinary(
+                
RequestMessage.build("g.addV('graphson_test')").addG(GTX).addTransactionId(txId).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost addVPost = new 
HttpPost(TestClientFactory.createURLString());
+        addVPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHSON_V4.getValue());
+        addVPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHSON_V4.getValue());
+        addVPost.setEntity(new ByteArrayEntity(addVReq.array()));
+        try (final CloseableHttpResponse response = client.execute(addVPost)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+        }
+
+        // commit via GraphSON
+        final ByteBuf commitReq = serializer.serializeRequestAsBinary(
+                
RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(),
+                new UnpooledByteBufAllocator(false));
+        final HttpPost commitPost = new 
HttpPost(TestClientFactory.createURLString());
+        commitPost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHSON_V4.getValue());
+        commitPost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHSON_V4.getValue());
+        commitPost.setEntity(new ByteArrayEntity(commitReq.array()));
+        try (final CloseableHttpResponse response = 
client.execute(commitPost)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+        }
+
+        // verify data persisted
+        try (final CloseableHttpResponse r = submitNonTx(client, 
"g.V().hasLabel('graphson_test').count()", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            assertEquals(1, extractCount(r));
+        }
+    }
+
+    /**
+     * Test #42: Transaction ID appears in both the {@code X-Transaction-Id} 
response header
+     * and the {@code transactionId} field in the response body. Verifies the 
dual transmission
+     * contract across begin, submit, and commit operations.
+     * NOTE: changed as the server only returns the Id in the HEADER not the 
BODY
+     */
+    @Test
+    public void shouldReturnTransactionIdHeader_42() throws Exception {
+        final String beginJson = "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + 
GTX + "\"}";
+        String txIdFromBegin;
+        try (final CloseableHttpResponse response = postJson(client, 
beginJson)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+
+            // header must be present
+            assertNotNull(response.getFirstHeader("X-Transaction-Id"));
+            final String txIdFromHeader = 
response.getFirstHeader("X-Transaction-Id").getValue();
+            assertNotNull(txIdFromHeader);
+            assertFalse(txIdFromHeader.isBlank());
+
+            // body must contain the same transaction ID
+            final String json = EntityUtils.toString(response.getEntity());
+            final JsonNode node = mapper.readTree(json);
+            txIdFromBegin = node.get(TOKEN_RESULT).get(TOKEN_DATA)
+                    .get(GraphSONTokens.VALUEPROP).get(0)
+                    .get(GraphSONTokens.VALUEPROP).get(1)
+                    .asText();
+            assertNotNull(txIdFromBegin);
+            assertFalse(txIdFromBegin.isBlank());
+
+            assertEquals("Transaction ID in header and body must match on 
begin",
+                    txIdFromHeader, txIdFromBegin);
+        }
+
+        try (final CloseableHttpResponse response = submitInTx(client, 
txIdFromBegin, "g.addV('dual_test')", GTX)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+
+            // Body doesn't contain the Transaction ID for Traversals in 
transactions.
+            assertEquals(txIdFromBegin, 
response.getFirstHeader("X-Transaction-Id").getValue());
+        }
+
+        try (final CloseableHttpResponse response = commitTx(client, 
txIdFromBegin, GTX)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+
+            final JsonNode jsonResponse = 
mapper.readTree(EntityUtils.toString(response.getEntity()));
+            final String txIdFromCommit = 
jsonResponse.get(TOKEN_RESULT).get(TOKEN_DATA)
+                    .get(GraphSONTokens.VALUEPROP).get(0)
+                    .get(GraphSONTokens.VALUEPROP).get(1)
+                    .asText();
+            assertEquals(txIdFromBegin, txIdFromCommit);
+
+            assertEquals("Transaction ID in header must match on submit",
+                    txIdFromBegin, 
response.getFirstHeader("X-Transaction-Id").getValue());
+        }
+    }
+
+    // TODO: move #39 over to this as its server side not client side test
+}
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
index 4e83032df4..43cc6109d2 100644
--- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
+++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
@@ -95,4 +95,11 @@ public final class Tokens {
      * identifying the kind of client it came from.
      */
     public static final String ARGS_USER_AGENT = "userAgent";
+
+    /**
+     * Argument name for the transaction ID used to track multi-request 
transactions over HTTP.
+     * The transaction ID is a UUID generated by the client at transaction 
begin time and included
+     * in every request within the transaction.
+     */
+    public static final String ARGS_TRANSACTION_ID = "transactionId";
 }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
index 30072dd3bd..a64cffada6 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
@@ -40,6 +40,7 @@ public final class RequestMessage {
         this.fields = fields;
 
         this.fields.putIfAbsent(Tokens.ARGS_LANGUAGE, "gremlin-lang");
+        this.fields.putIfAbsent(Tokens.ARGS_G, "g");
     }
 
     /**
@@ -165,6 +166,20 @@ public final class RequestMessage {
             return this;
         }
 
+        /**
+         * Adds a transaction ID to the request message. The transaction ID is 
used to track
+         * multi-request transactions over HTTP. All requests within a 
transaction must include
+         * the same transaction ID.
+         *
+         * @param transactionId the unique transaction identifier (typically a 
UUID)
+         * @return this builder
+         */
+        public Builder addTransactionId(final String transactionId) {
+            Objects.requireNonNull(transactionId, "transactionId argument 
cannot be null.");
+            this.fields.put(Tokens.ARGS_TRANSACTION_ID, transactionId);
+            return this;
+        }
+
         /**
          * Create the request message given the settings provided to the 
{@link Builder}.
          */
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
index bc0212dfdf..caa3def020 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
@@ -338,6 +338,9 @@ public abstract class AbstractGraphSONMessageSerializerV4 
extends AbstractMessag
             if (data.containsKey(Tokens.BULK_RESULTS)) {
                 
builder.addBulkResults(Boolean.parseBoolean(data.get(Tokens.BULK_RESULTS).toString()));
             }
+            if (data.containsKey(Tokens.ARGS_TRANSACTION_ID)) {
+                
builder.addTransactionId(data.get(Tokens.ARGS_TRANSACTION_ID).toString());
+            }
 
             return builder.create();
         }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
index 4d37f40e33..9397e5afde 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
@@ -73,6 +73,9 @@ public class RequestMessageSerializer {
             if (fields.containsKey(Tokens.BULK_RESULTS)) {
                 
builder.addBulkResults(Boolean.parseBoolean(fields.get(Tokens.BULK_RESULTS).toString()));
             }
+            if (fields.containsKey(Tokens.ARGS_TRANSACTION_ID)) {
+                
builder.addTransactionId(fields.get(Tokens.ARGS_TRANSACTION_ID).toString());
+            }
 
             return builder.create();
         } catch (IOException ex) {


Reply via email to