This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-tx-client in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 495156538bd8d98a0184c3f09886589e3035ae6f Author: Ken Hu <[email protected]> AuthorDate: Mon Mar 16 17:08:46 2026 -0700 server implementation of transactions --- .../apache/tinkerpop/gremlin/server/Context.java | 11 + .../tinkerpop/gremlin/server/GremlinServer.java | 5 + .../apache/tinkerpop/gremlin/server/Settings.java | 18 + .../gremlin/server/channel/HttpChannelizer.java | 4 +- .../server/handler/HttpGremlinEndpointHandler.java | 165 ++++- .../gremlin/server/handler/HttpHandlerUtil.java | 37 +- .../server/handler/HttpRequestMessageDecoder.java | 3 + .../gremlin/server/handler/TransactionManager.java | 179 +++++ .../server/handler/UnmanagedTransaction.java | 180 +++++ .../gremlin/server/util/GremlinError.java | 78 +++ .../gremlin/server/util/ServerGremlinExecutor.java | 13 + .../GremlinServerHttpTransactionIntegrateTest.java | 764 +++++++++++++++++++++ .../org/apache/tinkerpop/gremlin/util/Tokens.java | 7 + .../gremlin/util/message/RequestMessage.java | 15 + .../ser/AbstractGraphSONMessageSerializerV4.java | 3 + .../util/ser/binary/RequestMessageSerializer.java | 3 + 16 files changed, 1457 insertions(+), 28 deletions(-) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java index 21b7c0b636..06e2507e75 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java @@ -56,6 +56,7 @@ public class Context { private ScheduledFuture<?> timeoutExecutor = null; private boolean timeoutExecutorGrabbed = false; private final Object timeoutExecutorLock = new Object(); + private String transactionId; // initially null for non-transactional requests and begin() calls; set after transaction creation. public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx, final Settings settings, final GraphManager graphManager, @@ -80,6 +81,7 @@ public class Context { this.requestState = requestState; this.requestTimeout = determineTimeout(); this.materializeProperties = determineMaterializeProperties(); + this.transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID); } public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) { @@ -119,6 +121,15 @@ public class Context { return scheduledExecutorService; } + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(final String transactionId) { + this.transactionId = transactionId; + } + + /** * Gets the current request to Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java index 1314af6225..16d8a2d045 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java @@ -293,6 +293,11 @@ public class GremlinServer { logger.warn("Timeout waiting for boss/worker thread pools to shutdown - continuing with shutdown process."); } + if (serverGremlinExecutor != null) { + logger.info("Shutting down TransactionManager"); + serverGremlinExecutor.getTransactionManager().shutdown(); + } + // close TraversalSource and Graph instances - there aren't guarantees that closing Graph will close all // spawned TraversalSource instances so both should be closed directly and independently. if (serverGremlinExecutor != null) { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 5e56c76ed9..3489bd511e 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -184,6 +184,24 @@ public class Settings { */ public boolean strictTransactionManagement = false; + /** + * Time in milliseconds that a transaction can remain idle before it is automatically rolled back. + * This prevents resource leaks from abandoned transactions. Default is 600000 (10 minutes). + */ + public long transactionTimeout = 600000L; + + /** + * Time in milliseconds to wait for a transaction commit or rollback operation to complete. + * Default is 10000 (10 seconds). + */ + public long perGraphCloseTimeout = 10000L; + + /** + * Maximum number of concurrent transactions allowed on the server. + * Default is 1000. + */ + public int maxConcurrentTransactions = 1000; + /** * The full class name of the {@link Channelizer} to use in Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java index e90ae340bf..e5643f5154 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java @@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.handler.HttpRequestIdHandler; import org.apache.tinkerpop.gremlin.server.handler.HttpRequestMessageDecoder; import org.apache.tinkerpop.gremlin.server.handler.HttpUserAgentHandler; import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; +import org.apache.tinkerpop.gremlin.server.handler.TransactionManager; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; @@ -61,7 +62,8 @@ public class HttpChannelizer extends AbstractChannelizer { @Override public void init(final ServerGremlinExecutor serverGremlinExecutor) { super.init(serverGremlinExecutor); - httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(gremlinExecutor, graphManager, settings); + httpGremlinEndpointHandler = new HttpGremlinEndpointHandler( + gremlinExecutor, graphManager, settings, serverGremlinExecutor.getTransactionManager()); } @Override diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index cbc9f2d616..86a9b3943b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -27,9 +27,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; @@ -57,6 +55,7 @@ import org.apache.tinkerpop.gremlin.server.util.TraverserIterator; import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; import org.apache.tinkerpop.gremlin.structure.util.TemporaryException; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; @@ -82,6 +81,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -92,6 +92,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.stream.Stream; import static com.codahale.metrics.MetricRegistry.name; @@ -107,7 +108,8 @@ import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHan import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING; import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED; import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING; -import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders; +import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse; +import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent; import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError; /** @@ -168,13 +170,16 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ private final GremlinExecutor gremlinExecutor; private final GraphManager graphManager; private final Settings settings; + private final TransactionManager transactionManager; public HttpGremlinEndpointHandler(final GremlinExecutor gremlinExecutor, final GraphManager graphManager, - final Settings settings) { + final Settings settings, + final TransactionManager transactionManager) { this.gremlinExecutor = gremlinExecutor; this.graphManager = graphManager; this.settings = settings; + this.transactionManager = transactionManager; } @Override @@ -210,18 +215,36 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ requestMessage.getGremlin()); } - // Send back the 200 OK response header here since the response is always chunk transfer encoded. Any - // failures that follow this will show up in the response body instead. - final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, OK); - if (acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING))) { - responseHeader.headers().add(CONTENT_ENCODING, DEFLATE); + // These guards prevent any obvious failures from returning 200 OK early by detecting them here and + // throwing before any other processing starts so the user gets a better error code. + final String txId = requestCtx.getTransactionId(); + final String gremlin = requestMessage.getGremlin(); + if (isTransactionBegin(gremlin)) { + // If this is a begin transaction request then we need to create the Transaction ID first since the + // dual-transmission expectation means the response header below should contain it. + + // This prevents accidentally re-opening the underlying transaction. + if (txId != null) throw new ProcessingException(GremlinError.beginHasTransactionId()); + + handleBegin(requestCtx); + } else if (txId != null) { + // This check makes sure that the underlying Graph is already open to stop a closed transaction + // from re-opening due to the default autostart nature of transactions. This occurs in cases where a + // transactional traversal is submitted after a commit/rollback. + final Graph g = graphManager.getTraversalSource(requestMessage.getField(Tokens.ARGS_G)).getGraph(); + if ((!g.tx().isOpen())) { + throw new ProcessingException(GremlinError.transactionNotFound(txId)); + } + } else if ((txId == null) && (isTransactionCommit(gremlin) || isTransactionRollback(gremlin))) { + // Logically, commit/rollback should only be allowed on a transactional request. + throw new ProcessingException(GremlinError.transactionalControlRequiresTransaction()); } - responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); - responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0()); - ctx.writeAndFlush(responseHeader); - ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true); - iterateScriptEvalResult(requestCtx, serializer.getValue1(), requestMessage); + // Send back the 200 OK response header here since the response is always chunk transfer encoded. Any + // failures that follow this will show up in the response body instead. + sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); + sendHttpContents(ctx, requestCtx); + sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); } catch (Throwable t) { writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1()); } finally { @@ -240,7 +263,9 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ }); try { - final Future<?> executionFuture = requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture); + final Future<?> executionFuture = (requestCtx.getTransactionId() != null) ? + transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) : + requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture); if (seto > 0) { // Schedule a timeout in the thread pool for future execution requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(() -> { @@ -252,6 +277,47 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ } } catch (RejectedExecutionException ree) { writeError(requestCtx, GremlinError.rateLimiting(), serializer.getValue1()); + } catch (NoSuchElementException nsee) { + writeError(requestCtx, GremlinError.transactionNotFound(requestCtx.getTransactionId()), serializer.getValue1()); + } + } + + private List<CharSequence> createResponseHeaders(final ChannelHandlerContext ctx, + final Pair<String, MessageSerializer<?>> serializer, + final Context requestCtx) { + final List<CharSequence> headers = new ArrayList<>(); + headers.add(HttpHeaderNames.CONTENT_TYPE); + headers.add(serializer.getValue0()); + if (acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING))) { + headers.add(CONTENT_ENCODING); + headers.add(DEFLATE); + } + if (requestCtx.getTransactionId() != null) { + headers.add("X-Transaction-Id"); + headers.add(requestCtx.getTransactionId()); + } + return headers; + } + + private void sendHttpContents(final ChannelHandlerContext ctx, final Context requestContext) throws Exception { + final Pair<String, MessageSerializer<?>> serializer = ctx.channel().attr(StateKey.SERIALIZER).get(); + final RequestMessage request = requestContext.getRequestMessage(); + final String txId = requestContext.getTransactionId(); + final Optional<UnmanagedTransaction> transaction = transactionManager.get(txId); + final Graph graph = graphManager.getTraversalSource(request.getField(Tokens.ARGS_G)).getGraph(); + + // Early guard against fake or incorrect transaction IDs. + if ((txId != null) && transaction.isEmpty()) throw new ProcessingException(GremlinError.transactionNotFound(txId)); + + if (isTransactionBegin(request.getGremlin())) { + runBegin(requestContext, transaction.get(), serializer); + } else if (isTransactionCommit(request.getGremlin())) { + handleGraphOp(requestContext, txId, graph, Transaction::commit, serializer); + } else if (isTransactionRollback(requestContext.getRequestMessage().getGremlin())) { + handleGraphOp(requestContext, txId, graph, Transaction::rollback, serializer); + } else { + // Both transactional and non-transactional traversals follow this path for response chunking. + iterateScriptEvalResult(requestContext, serializer.getValue1(), request); } } @@ -372,6 +438,70 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ } } + /** + * Detects if the gremlin script is a transaction begin command. + */ + private boolean isTransactionBegin(final String gremlin) { + if (gremlin == null) return false; + return gremlin.trim().equalsIgnoreCase("g.tx().begin()"); + } + + /** + * Detects if the gremlin script is a transaction commit command. + */ + private boolean isTransactionCommit(final String gremlin) { + if (gremlin == null) return false; + return gremlin.trim().equalsIgnoreCase("g.tx().commit()"); + } + + /** + * Detects if the gremlin script is a transaction rollback command. + */ + private boolean isTransactionRollback(final String gremlin) { + if (gremlin == null) return false; + return gremlin.trim().equalsIgnoreCase("g.tx().rollback()"); + } + + /** + * Handle begin by creating an {@link UnmanagedTransaction} and submitting the open to its executor. + */ + private void handleBegin(final Context ctx) throws Exception { + final String traversalSourceName = ctx.getRequestMessage().getField(Tokens.ARGS_G); + + final UnmanagedTransaction txCtx; + try { + txCtx = transactionManager.create(traversalSourceName); + ctx.setTransactionId(txCtx.getTransactionId()); + final Graph graph = graphManager.getTraversalSource(traversalSourceName).getGraph(); + txCtx.submit(new FutureTask<>(() -> { + graph.tx().open(); + return null; + })).get(); // TODO: institute timeout + } catch (IllegalStateException ise) { + throw new ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage())); + } catch (IllegalArgumentException iae) { + throw new ProcessingException(GremlinError.binding(traversalSourceName)); + } catch (UnsupportedOperationException uoe) { + throw new ProcessingException(GremlinError.transactionNotSupported(uoe)); + } + } + + private void runBegin(final Context ctx, UnmanagedTransaction tx, final Pair<String, MessageSerializer<?>> serializer) throws Exception { + final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, false); + ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk)); + } + + private void handleGraphOp(final Context ctx, + final String transactionId, + final Graph graph, + final Consumer<Transaction> graphOp, + final Pair<String, MessageSerializer<?>> serializer) throws Exception { + graphOp.accept(graph.tx()); + transactionManager.destroy(transactionId); + final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false); + ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk)); + } + private Bindings mergeBindingsFromRequest(final Context ctx, final Bindings bindings) throws ProcessingException { // alias any global bindings to a different variable. final RequestMessage msg = ctx.getRequestMessage(); @@ -431,7 +561,6 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ // it needs to be released here if (chunk != null) chunk.release(); } - sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, ""); return; } @@ -518,10 +647,6 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ } nettyContext.writeAndFlush(new DefaultHttpContent(chunk)); - - if (!hasMore) { - sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, ""); - } } } else { final long currentTime = System.currentTimeMillis(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index 1e84e35dde..782be1100c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -97,6 +97,8 @@ public class HttpHandlerUtil { * @param serializer The serializer to use to serialize the error response. */ static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer<?> serializer) { + logger.debug("Regular path writing error: {}", responseMessage); + try { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING @@ -106,16 +108,14 @@ public class HttpHandlerUtil { context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR); if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) { - final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, responseMessage.getStatus().getCode()); - responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); // Set this to make it "keep alive" eligible. - responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, ctx.channel().attr(StateKey.SERIALIZER).get().getValue0()); - ctx.writeAndFlush(responseHeader); - ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true); + sendHttpResponse(ctx, + responseMessage.getStatus().getCode(), + HttpHeaderNames.CONTENT_TYPE, ctx.channel().attr(StateKey.SERIALIZER).get().getValue0()); } ctx.writeAndFlush(new DefaultHttpContent(ByteBuf)); - sendTrailingHeaders(ctx, responseMessage.getStatus().getCode(), responseMessage.getStatus().getException()); + sendLastHttpContent(ctx, responseMessage.getStatus().getCode(), responseMessage.getStatus().getException()); } catch (SerializationException se) { logger.warn("Unable to serialize ResponseMessage: {} ", responseMessage); } @@ -147,7 +147,8 @@ public class HttpHandlerUtil { * @param statusCode The status code to include in the trailers. * @param exceptionType The type of exception to include in the trailers. Leave blank or null if no error occurred. */ - static void sendTrailingHeaders(final ChannelHandlerContext ctx, final HttpResponseStatus statusCode, final String exceptionType) { + static void sendLastHttpContent(final ChannelHandlerContext ctx, final HttpResponseStatus statusCode, final String exceptionType) { + // TODO: this might be not sent if exception occurs early so HTTP not properly terminated final DefaultLastHttpContent defaultLastHttpContent = new DefaultLastHttpContent(); defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE, statusCode.code()); if (exceptionType != null && !exceptionType.isEmpty()) { @@ -156,4 +157,26 @@ public class HttpHandlerUtil { ctx.writeAndFlush(defaultLastHttpContent); } + + /** + * Sends the initial HTTP response header with the given status and optional header pairs. + * Also marks the channel as having sent a response. Headers must be provided as alternating + * name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}). + * + * @param ctx The netty channel context. + * @param status The HTTP status code for the response. + * @param headers Alternating header name/value pairs to set on the response. + * @throws IllegalArgumentException if headers length is not even + */ + static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpResponseStatus status, final CharSequence... headers) { + if ((headers.length%2) != 0) throw new IllegalArgumentException("Headers should come in pairs."); + + final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, status); + responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); + for (int i=0; i<headers.length; i+=2) { + responseHeader.headers().set(headers[i], headers[i+1]); + } + ctx.writeAndFlush(responseHeader); + ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true); + } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java index a220b40106..fc8552a883 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java @@ -205,6 +205,9 @@ public class HttpRequestMessageDecoder extends MessageToMessageDecoder<FullHttpR final JsonNode matPropsNode = body.get(Tokens.ARGS_MATERIALIZE_PROPERTIES); if (null != matPropsNode) builder.addMaterializeProperties(matPropsNode.asText()); + final JsonNode txIdNode = body.get(Tokens.ARGS_TRANSACTION_ID); + if (null != txIdNode) builder.addTransactionId(txIdNode.asText()); + return builder.create(); } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java new file mode 100644 index 0000000000..f1e63ac7fc --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.handler; + +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.server.GraphManager; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Manages active HTTP transactions on the server. + * <p> + * This class is responsible for: + * <ul> + * <li>Creating new transaction contexts when clients begin transactions</li> + * <li>Looking up existing transactions by ID</li> + * <li>Enforcing maximum concurrent transaction limits</li> + * <li>Graceful shutdown of all active transactions</li> + * </ul> + */ +public class TransactionManager { + private static final Logger logger = LoggerFactory.getLogger(TransactionManager.class); + + private final ConcurrentMap<String, UnmanagedTransaction> transactions = new ConcurrentHashMap<>(); + + private final ScheduledExecutorService scheduledExecutorService; + private final GraphManager graphManager; + private final long transactionTimeoutMs; + private final int maxConcurrentTransactions; + + /** + * Creates a new TransactionManager with the specified configuration. + * + * @param scheduledExecutorService Scheduler for timeout management + * @param graphManager The graph manager for accessing traversal sources + * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback + * @param maxConcurrentTransactions Maximum number of concurrent transactions allowed + */ + public TransactionManager(final ScheduledExecutorService scheduledExecutorService, + final GraphManager graphManager, + final long transactionTimeoutMs, + final int maxConcurrentTransactions) { + this.scheduledExecutorService = scheduledExecutorService; + this.graphManager = graphManager; + this.transactionTimeoutMs = transactionTimeoutMs; + this.maxConcurrentTransactions = maxConcurrentTransactions; + + logger.info("TransactionManager initialized with timeout={}ms, maxTransactions={}", + transactionTimeoutMs, maxConcurrentTransactions); + } + + /** + * Creates a new {@link UnmanagedTransaction} for the specified traversal source. + * + * @param traversalSourceName The traversal source alias (e.g., "g") + * @return The new {@link UnmanagedTransaction}, ready for task submission + * @throws IllegalStateException if max transactions exceeded + * @throws IllegalArgumentException if traversal source not found + * @throws UnsupportedOperationException if the graph does not support transactions + */ + public UnmanagedTransaction create(final String traversalSourceName) { + if (transactions.size() >= maxConcurrentTransactions) { + throw new IllegalStateException( + "Maximum concurrent transactions exceeded (" + maxConcurrentTransactions + ")"); + } + + final TraversalSource ts = graphManager.getTraversalSource(traversalSourceName); + if (ts == null) { + throw new IllegalArgumentException("Traversal source not found: " + traversalSourceName); + } else if (!ts.getGraph().features().graph().supportsTransactions()) { + throw Graph.Exceptions.transactionsNotSupported(); + } + + final UnmanagedTransaction txCtx = createTransactionContext(ts.getGraph()); + logger.debug("Transaction {} created for source {}", txCtx.getTransactionId(), traversalSourceName); + return txCtx; + } + + /** + * Removes a transaction from the active transactions map. Called when a transaction is + * committed, rolled back, or otherwise closed. + * + * @param id The transaction ID to remove + */ + public void destroy(final String id) { + transactions.remove(id); + } + + /** + * Creates a unique transaction ID, retrying on the unlikely UUID collision. The newly created + * {@link UnmanagedTransaction} is inserted into the transactions map. + */ + private UnmanagedTransaction createTransactionContext(final Graph graph) { + String txId; + UnmanagedTransaction ctx; + + do { + txId = UUID.randomUUID().toString(); + ctx = new UnmanagedTransaction( + txId, + this, + graph, + scheduledExecutorService, + transactionTimeoutMs + ); + } while (transactions.putIfAbsent(txId, ctx) != null); + + return ctx; + } + + /** + * Gets an existing {@link UnmanagedTransaction} by ID. + * + * @param transactionId The transaction ID to look up + * @return Optional containing the {@link UnmanagedTransaction} if found, empty otherwise + */ + public Optional<UnmanagedTransaction> get(final String transactionId) { + if (null == transactionId) return Optional.empty(); // Prevent NPE from calling get(null) on ConcurrentHashMap + return Optional.ofNullable(transactions.get(transactionId)); + } + + /** + * Returns the number of currently active transactions. + * + * @return the count of active transactions + */ + public int getActiveTransactionCount() { + return transactions.size(); + } + + /** + * Shuts down the transaction manager, rolling back all active transactions. + * <p> + * This method should be called during server shutdown to ensure all transactions + * are properly cleaned up. It blocks until all rollbacks complete. + */ + public void shutdown() { + final int activeCount = transactions.size(); + logger.info("Shutting down TransactionManager with {} active transactions", activeCount); + + if (activeCount == 0) return; + + // Roll back all active transactions + transactions.values().forEach(transaction -> { + try { + transaction.close(false); + } catch (Exception e) { + logger.warn("Error rolling back transaction {} during shutdown: {}", + transaction.getTransactionId(), e.getMessage()); + } + }); + + transactions.clear(); + logger.info("TransactionManager shutdown complete"); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java new file mode 100644 index 0000000000..aa8fa4a7a0 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.handler; + +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Maintains state for an active transaction over HTTP. + * <p> + * Key design principle: Graph transactions are ThreadLocal-bound, so all operations + * for a transaction must execute on the same thread. This is achieved via a + * single-threaded executor. Callers submit {@link FutureTask} instances that contain + * the complete request lifecycle (graph operation, error handling, response writing), + * following the same pattern as the non-transactional HTTP path and the legacy + * {@code SessionOpProcessor}. + */ +public class UnmanagedTransaction { + private static final Logger logger = LoggerFactory.getLogger(UnmanagedTransaction.class); + + private final String transactionId; + private final TransactionManager manager; + private final Graph graph; + private final ScheduledExecutorService scheduledExecutorService; + private final long timeout; + private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(); + + // Controls whether the executor is still accepting tasks. + private final AtomicBoolean accepting = new AtomicBoolean(true); + /** + * Single-threaded executor ensures all operations for this transaction run on + * the same thread, preserving the ThreadLocal nature of Graph transactions. + */ + private final ExecutorService executor; + + /** + * Creates a new {@code UnmanagedTransaction} for managing an HTTP transaction. + * + * @param transactionId The unique identifier for this transaction + * @param transactionManager The manager that owns this transaction's lifecycle + * @param graph The graph instance for this transaction + * @param scheduledExecutorService Scheduler for timeout management + * @param transactionTimeout Timeout in milliseconds before auto-rollback + */ + public UnmanagedTransaction(final String transactionId, + final TransactionManager transactionManager, + final Graph graph, + final ScheduledExecutorService scheduledExecutorService, + final long transactionTimeout) { + logger.debug("New transaction context established for {}", transactionId); + this.transactionId = transactionId; + this.manager = transactionManager; + this.graph = graph; + this.scheduledExecutorService = scheduledExecutorService; + this.timeout = transactionTimeout; + + // Create single-threaded executor with named thread for debugging + this.executor = Executors.newSingleThreadExecutor( + r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length())))); + } + + /** + * Returns the transaction ID. + */ + public String getTransactionId() { + return transactionId; + } + + /** + * Resets the timeout for this transaction. Called on each request. + */ + public void touch() { + timeoutFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return scheduledExecutorService.schedule(() -> { + logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, timeout); + close(false); + }, timeout, TimeUnit.MILLISECONDS); + }); + } + + /** + * Opens the underlying graph transaction and starts the inactivity timeout. + * Should be called on the transaction's single-threaded executor to preserve + * ThreadLocal affinity. On failure the exception is re-thrown and the caller + * is responsible for cleanup (e.g. via {@link #close(boolean)}). + */ + public void open() { + try { + graph.tx().open(); + touch(); + logger.debug("Transaction {} opened", transactionId); + } catch (Exception e) { + logger.warn("Failed to begin transaction {}: {}", transactionId, e.getMessage()); + throw e; + } + } + + /** + * Closes this transaction and releases its resources. When {@code force} is {@code false}, + * any open graph transaction is rolled back before shutdown. When {@code force} is {@code true}, + * the executor is shut down immediately without attempting a rollback. + * + * @param force if {@code true}, skip the rollback attempt and shut down immediately + */ + public synchronized void close(boolean force) { + accepting.set(false); + + // if the transaction has already been removed then there's no need to do this process again. it's possible + // for this to be called at roughly the same time. this prevents close() from being called more than once. + if (manager.get(transactionId).isEmpty()) return; + + if (!force) { + // when not "forced", an open transaction should be rolled back + try { + executor.submit(() -> { + if (graph.tx().isOpen()) { + logger.debug("Rolling back open transaction on {}", transactionId); + graph.tx().rollback(); + } + }).get(1000, TimeUnit.MILLISECONDS); // TODO: wire this to configuredPerGraphCloseTimeout + } catch (Exception ex) { + logger.warn(String.format("An error occurred while attempting rollback on %s ", transactionId), ex); + } + } + + // prevent any additional requests from processing. if the kill was not "forced" then jobs were scheduled to + // try to rollback open transactions. those jobs either timed-out or completed successfully. either way, no + // additional jobs will be allowed, running jobs will be cancelled (if possible) and any scheduled jobs will + // be cancelled + executor.shutdownNow(); + manager.destroy(transactionId); + Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(true)); + logger.debug("Transaction {} closed", transactionId); + } + + /** + * Submits a task to be executed within this transaction's thread context. + * The task should contain the complete request lifecycle: graph operation, + * error handling, and response writing. + * + * @param task The FutureTask to execute on the transaction thread + * @return Future that can be used for timeout cancellation + * @throws IllegalStateException if the transaction is closed + */ + public Future<?> submit(final FutureTask<Void> task) { + if (!accepting.get()) throw new IllegalStateException("Transaction " + transactionId + " is closed"); + + touch(); + return executor.submit(task); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java index c6bbef3ae9..e65364e505 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java @@ -144,4 +144,82 @@ public class GremlinError { final String message = (t.getMessage() == null) ? t.toString() : t.getMessage(); return new GremlinError(HttpResponseStatus.INTERNAL_SERVER_ERROR, message, "ServerErrorException"); } + + /** + * Creates an error for when a transaction is not found on the server. + * This typically occurs when: + * <ul> + * <li>The transaction ID was never registered (client didn't call begin)</li> + * <li>The transaction timed out and was automatically rolled back</li> + * <li>The transaction was already committed or rolled back</li> + * </ul> + * + * @param transactionId The transaction ID that was not found + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError transactionNotFound(final String transactionId) { + final String message = String.format( + "Transaction not found: %s. The transaction may have timed out, already been committed/rolled back, " + + "or was never started. Call g.tx().begin() to start a new transaction.", transactionId); + return new GremlinError(HttpResponseStatus.NOT_FOUND, message, "TransactionException"); + } + + /** + * Creates an error for when commit or rollback is sent without a transaction ID. + * + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError transactionalControlRequiresTransaction() { + final String message = "g.tx().commit() and g.tx().rollback() are only allowed in transactional requests."; + return new GremlinError(HttpResponseStatus.BAD_REQUEST, message, "TransactionException"); + } + + /** + * Creates an error for when a begin request is sent with a user-supplied transaction ID. + * The server generates transaction IDs; clients should not provide them on begin. + * + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError beginHasTransactionId() { + final String message = "Begin transaction request cannot have a user-supplied transactionId"; + return new GremlinError(HttpResponseStatus.BAD_REQUEST, message, "TransactionException"); + } + + /** + * Creates an error for when the maximum number of concurrent transactions is exceeded. + * + * @param exceededErrorMessage The error message containing a maximum number of concurrent transactions + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError maxTransactionsExceeded(String exceededErrorMessage) { + final String message = exceededErrorMessage + + " The server has reached its transaction limit. " + + "Please wait for existing transactions to complete or increase the server's maxConcurrentTransactions setting."; + return new GremlinError(HttpResponseStatus.SERVICE_UNAVAILABLE, message, "TransactionException"); + } + + /** + * Creates an error for when a transaction operation times out. + * + * @param transactionId The transaction ID that timed out + * @param operation The operation that timed out (e.g., "commit", "rollback", "execute") + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError transactionTimeout(final String transactionId, final String operation) { + final String message = String.format( + "Transaction %s timed out during %s operation. The transaction has been rolled back. " + + "Consider increasing the transaction timeout or breaking the operation into smaller parts.", + transactionId, operation); + return new GremlinError(HttpResponseStatus.GATEWAY_TIMEOUT, message, "TransactionException"); + } + + /** + * Creates an error for when the requested graph does not support transactions. + * + * @param uoe The exception stating that transactions aren't supported + * @return A GremlinError with appropriate message and status code + */ + public static GremlinError transactionNotSupported(final UnsupportedOperationException uoe) { + return new GremlinError(HttpResponseStatus.BAD_REQUEST, uoe.getMessage(), "TransactionException"); + } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java index 97a2c8ee4a..3f29a8cad0 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java @@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.server.Channelizer; import org.apache.tinkerpop.gremlin.server.GraphManager; import org.apache.tinkerpop.gremlin.server.GremlinServer; import org.apache.tinkerpop.gremlin.server.Settings; +import org.apache.tinkerpop.gremlin.server.handler.TransactionManager; import org.apache.tinkerpop.gremlin.structure.Graph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class ServerGremlinExecutor { private static final Logger logger = LoggerFactory.getLogger(ServerGremlinExecutor.class); private final GraphManager graphManager; + private final TransactionManager transactionManager; private final Settings settings; private final List<LifeCycleHook> hooks; @@ -197,6 +199,13 @@ public class ServerGremlinExecutor { .filter(kv -> kv.getValue() instanceof LifeCycleHook) .map(kv -> (LifeCycleHook) kv.getValue()) .collect(Collectors.toList()); + + transactionManager = new TransactionManager( + scheduledExecutorService, + graphManager, + settings.transactionTimeout, + settings.maxConcurrentTransactions + ); } private void registerMetrics(final String engineName) { @@ -236,6 +245,10 @@ public class ServerGremlinExecutor { return graphManager; } + public TransactionManager getTransactionManager() { + return transactionManager; + } + public Settings getSettings() { return settings; } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java new file mode 100644 index 0000000000..42be6f23c2 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -0,0 +1,764 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.http.Consts; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens; +import org.apache.tinkerpop.gremlin.util.Tokens; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.Serializers; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_DATA; +import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_RESULT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Server-side integration tests for HTTP transaction protocol. + * <p> + * These tests bypass the driver entirely and use a raw Apache HTTP client to hit the server's HTTP endpoint directly. + * This validates that the server returns the correct status codes and error messages independent of any client-side + * guards. + */ +public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinServerIntegrationTest { + private final String GTX = "gtx"; + private final ObjectMapper mapper = new ObjectMapper(); + private CloseableHttpClient client; + + @Before + public void createHttpClient() { + client = HttpClients.createDefault(); + } + + @After + public void closeHttpClient() throws Exception { + client.close(); + } + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + final String nameOfTest = name.getMethodName(); + switch (nameOfTest) { + case "shouldRejectRequestWhenMaxConcurrentTransactionsExceeded_22": + settings.maxConcurrentTransactions = 1; + break; + case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34": + settings.maxConcurrentTransactions = 1; + settings.transactionTimeout = 1000; + break; + case "shouldTimeoutIdleTransactionWithNoOperations_30": + settings.transactionTimeout = 1; + break; + case "shouldTimeoutAndRejectLateCommit_32": + settings.transactionTimeout = 1000; + break; + } + return settings; + } + + /** + * Sends a JSON POST request and returns the response. Caller must close the response. + */ + private CloseableHttpResponse postJson(final CloseableHttpClient client, final String json) throws Exception { + final HttpPost post = new HttpPost(TestClientFactory.createURLString()); + post.addHeader("Content-Type", "application/json"); + post.setEntity(new StringEntity(json, Consts.UTF_8)); + return client.execute(post); + } + + /** + * Sends a begin transaction request for the given graph alias and returns the server-generated transaction ID. + */ + private String beginTx(final CloseableHttpClient client, final String graphAlias) throws Exception { + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + graphAlias + "\"}")) { + assertEquals(200, response.getStatusLine().getStatusCode()); + final String txIdHeader = response.getFirstHeader("X-Transaction-Id").getValue(); + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + final String txIdBody = node.get(TOKEN_RESULT). + get(TOKEN_DATA). + get(GraphSONTokens.VALUEPROP).get(0). + get(GraphSONTokens.VALUEPROP).get(1). + asText(); + assertNotNull(txIdHeader); + assertEquals(txIdHeader, txIdBody); + return txIdHeader; + } + } + + /** + * Sends a traversal within an existing transaction. + */ + private CloseableHttpResponse submitInTx(final CloseableHttpClient client, + final String txId, + final String gremlin, + final String graphAlias) + throws Exception { + return postJson(client, + "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias + "\",\"transactionId\":\"" + txId + "\"}"); + } + + /** + * Sends a commit for an existing transaction. + */ + private CloseableHttpResponse commitTx(final CloseableHttpClient client, + final String txId, final String graphAlias) throws Exception { + return postJson(client, + "{\"gremlin\":\"g.tx().commit()\",\"g\":\"" + graphAlias + "\",\"transactionId\":\"" + txId + "\"}"); + } + + /** + * Sends a rollback for an existing transaction. + */ + private CloseableHttpResponse rollbackTx(final CloseableHttpClient client, + final String txId, final String graphAlias) throws Exception { + return postJson(client, + "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"" + graphAlias + "\",\"transactionId\":\"" + txId + "\"}"); + } + + /** + * Sends a non-transactional traversal (no transactionId). + */ + private CloseableHttpResponse submitNonTx(final CloseableHttpClient client, + final String gremlin, final String graphAlias) throws Exception { + return postJson(client, + "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias + "\"}"); + } + + /** + * Extracts the integer count from a typical count() response. + */ + private int extractCount(final CloseableHttpResponse response) throws Exception { + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + return node.get("result").get(TOKEN_DATA) + .get(GraphSONTokens.VALUEPROP).get(0) + .get(GraphSONTokens.VALUEPROP).intValue(); + } + + /** + * Extracts the status message from the response body's status field. + */ + private String extractStatusMessage(final CloseableHttpResponse response) throws Exception { + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + return node.get("status").get("message").asText(); + } + + @Test + public void shouldNotBeginTransactionWithUserProvidedId() throws Exception { + final String txId = beginTx(client, GTX); + + try (final CloseableHttpResponse r = postJson(client, + "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + "\",\"transactionId\":\"" + txId + "\"}")) { + // Depending on whether the transaction is still open on the server when the second request arrives, there + // may be two different errors that the server throws. + assertTrue(r.getStatusLine().getStatusCode() == 404 || r.getStatusLine().getStatusCode() == 400); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Begin transaction request cannot have a user-supplied transactionId") || + msg.contains("Transaction not found")); + } + } + + /** + * Combined test for #10 and #13: After commit, submitting a traversal or another commit + * with the committed transaction's ID returns 404 "Transaction not found". + */ + @Test + public void shouldReturn404ForInvalidCommit_10_13() throws Exception { + // Can't commit on non-existent transaction. + try (final CloseableHttpResponse r = commitTx(client, "fakeId", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + + final String txId = beginTx(client, GTX); + + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV('test')", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // #10: submit traversal on committed tx -> 404 + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + + // #13: commit again on committed tx -> 404 + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + } + + /** + * Combined test for #11 and #14: After rollback, submitting a traversal or another rollback + * with the rolled-back transaction's ID returns 404 "Transaction not found". + */ + @Test + public void shouldReturn404ForInvalidRollback_11_14() throws Exception { + // Can't rollback a non-existent transaction. + try (final CloseableHttpResponse r = rollbackTx(client, "fakeId", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + + final String txId = beginTx(client, GTX); + + // add a vertex and rollback + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV('test')", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // #11: submit traversal on rolled-back tx -> 404 + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + final String msg = extractStatusMessage(r); + assertEquals(404, r.getStatusLine().getStatusCode()); + assertTrue(msg.contains("Transaction not found")); + } + + // #14: rollback again on rolled-back tx -> 404 + try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + } + + /** + * Combined test for #20 and #27: Server returns a transaction ID in response to begin, + * and two separate begins produce distinct IDs. + */ + @Test + public void shouldReturnValidTransactionId_20_27() throws Exception { + // #20: begin returns a valid transaction ID + final String txId1 = beginTx(client, GTX); + assertNotNull(txId1); + assertFalse(txId1.isBlank()); + + // #27: second begin returns a different ID + final String txId2 = beginTx(client, GTX); + assertNotNull(txId2); + assertFalse(txId2.isBlank()); + assertNotEquals(txId1, txId2); + } + + /** + * #21: Server returns "Transaction not found" for a fabricated/invalid transaction ID. + */ + @Test + public void shouldReturn404ForInvalidTransactionId_21() throws Exception { + final String fakeTxId = UUID.randomUUID().toString(); + + try (final CloseableHttpResponse r = submitInTx(client, fakeTxId, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + } + + /** + * #22: Server enforces maxConcurrentTransactions and returns error when limit exceeded. + * Configured with maxConcurrentTransactions=1. + */ + @Test + public void shouldRejectRequestWhenMaxConcurrentTransactionsExceeded_22() throws Exception { + // open one transaction (fills the limit) + beginTx(client, GTX); + + // try to open another -- should fail with 503 (SERVICE_UNAVAILABLE) + try (final CloseableHttpResponse r = postJson(client, + "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + "\"}")) { + assertEquals(503, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Maximum concurrent transactions exceeded")); + } + } + + /** + * #30: Idle transaction with no operations times out and is removed. + */ + @Test + public void shouldTimeoutIdleTransactionWithNoOperations_30() throws Exception { + final String txId = beginTx(client, GTX); + + // wait for the transaction to timeout (configured at 1ms) + Thread.sleep(1000); + + // the transaction should be gone + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + } + + /** + * #32: Late commit after timeout -- timeout fires and rolls back, then client sends commit. + * Server should return 404 and data should not persist. + */ + @Test + public void shouldTimeoutAndRejectLateCommit_32() throws Exception { + final String txId = beginTx(client, GTX); + + // add a vertex + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV('timeout_test')", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // wait for timeout (configured at 1000ms) + Thread.sleep(2000); + + // attempt commit -- should fail with 404 + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(r); + assertTrue(msg.contains("Transaction not found")); + } + + // verify data was not persisted + try (final CloseableHttpResponse r = submitNonTx(client, "g.V().hasLabel('timeout_test').count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(0, extractCount(r)); + } + } + + /** + * #34: Timeout frees a slot under maxConcurrentTransactions. + * Configured with maxConcurrentTransactions=1 and transactionTimeout=1000. + */ + @Test + public void shouldTimeoutFreeSlotUnderMaxConcurrentTransactions_34() throws Exception { + // fill the single slot + beginTx(client, GTX); + + // wait for timeout to reclaim the slot + Thread.sleep(2000); + + // now a new transaction should succeed + final String txId = beginTx(client, GTX); + assertNotNull(txId); + assertFalse(txId.isBlank()); + } + + /** + * #38: Repeated operations on a closed transaction -- after commit, subsequent commit, + * rollback, and traversal all fail with 404. Same for after rollback. + */ + @Test + public void shouldReturn404ForAllOperationsOnClosedTransaction_38() throws Exception { + // --- After commit --- + final String txId1 = beginTx(client, GTX); + try (final CloseableHttpResponse r = submitInTx(client, txId1, "g.addV('test38')", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse r = commitTx(client, txId1, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // rollback-after-commit + try (final CloseableHttpResponse r = rollbackTx(client, txId1, GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + } + // traversal-after-commit + try (final CloseableHttpResponse r = submitInTx(client, txId1, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + } + + // --- After rollback --- + final String txId2 = beginTx(client, GTX); + try (final CloseableHttpResponse r = submitInTx(client, txId2, "g.addV('test38b')", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse r = rollbackTx(client, txId2, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // commit-after-rollback + try (final CloseableHttpResponse r = commitTx(client, txId2, GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + } + // traversal-after-rollback + try (final CloseableHttpResponse r = submitInTx(client, txId2, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + } + } + + /** + * 38a: Traversal queued behind commit on server executor -- from two threads, simultaneously + * send a commit and a traversal for the same transaction. After the commit executes, the + * queued traversal must fail with 404 and no leaked data should exist. + */ + @Test + public void shouldNotLeakDataWhenTraversalQueuedBehindCommit_38a() throws Exception { + final String txId = beginTx(client, GTX); + + // add vertices and an edge in the transaction + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 1)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 2)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + } + + // Fire three requests concurrently: a long traversal to occupy the server executor, then a commit that queues + // behind it, then a short query that queues behind the commit. The short query should fail with 404 because the + // commit closes the transaction first. + final ExecutorService executor = Executors.newFixedThreadPool(3); + try { + final Future<CloseableHttpResponse> longFuture = executor.submit(() -> + submitInTx(client, txId, "g.V().repeat(both()).times(1000)", GTX)); + Thread.sleep(50); + + final Future<CloseableHttpResponse> commitFuture = executor.submit(() -> + commitTx(client, txId, GTX)); + Thread.sleep(50); + + final Future<CloseableHttpResponse> shortFuture = executor.submit(() -> + submitInTx(client, txId, "g.V().count()", GTX)); + + // collect responses + try (final CloseableHttpResponse ignored = longFuture.get(30, TimeUnit.SECONDS)) { + // it doesn't matter what the long traversal returns, only that it ran + } + try (final CloseableHttpResponse commitResp = commitFuture.get(30, TimeUnit.SECONDS)) { + assertEquals(200, commitResp.getStatusLine().getStatusCode()); + } + try (final CloseableHttpResponse shortResp = shortFuture.get(30, TimeUnit.SECONDS)) { + assertEquals(404, shortResp.getStatusLine().getStatusCode()); + final String msg = extractStatusMessage(shortResp); + assertTrue(msg.contains("Transaction not found")); + } + } finally { + executor.shutdownNow(); + } + } + + /** + * #44: The g alias is locked per transaction -- sending a request with a valid transaction ID + * but a different g alias should be rejected with 400. + */ + @Test + public void shouldRejectMismatchedGraphAliasInTransaction_44() throws Exception { + final String txId = beginTx(client, GTX); + + // send a request with the same txId but a different graph alias + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", "gclassic")) { + final int status = r.getStatusLine().getStatusCode(); + assertTrue("Expected error status for alias mismatch, got " + status, + status == 400 || status == 404 || status == 500); + } + } + + /** + * #46: Graph alias is present on the begin request. Also tests that begin with no alias + * or an invalid alias returns an error. + */ + @Test + public void shouldRequireGraphAliasOnBeginRequest_46() throws Exception { + // begin with no g alias -- should default to "g" which doesn't support transactions + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().begin()\"}")) { + final int status = response.getStatusLine().getStatusCode(); + assertEquals(400, status); + assertTrue(extractStatusMessage(response).contains("Graph does not support transactions")); + } + + // begin with an invalid alias -- should fail + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().begin()\",\"g\":\"nonexistent\"}")) { + final int status = response.getStatusLine().getStatusCode(); + assertEquals(400, status); + assertTrue(extractStatusMessage(response).contains("Could not alias")); + } + + // begin with valid alias -- should succeed (positive case) + final String txId = beginTx(client, GTX); + assertNotNull(txId); + assertFalse(txId.isBlank()); + } + + /** + * #47: Commit and rollback requests must carry the transaction ID. Sending commit/rollback + * without a transaction ID should fail. Sending with a valid ID should succeed. + */ + @Test + public void shouldRequireTransactionIdOnCommitAndRollback_47() throws Exception { + // commit with no transactionId -- this is just "g.tx().commit()" with no txId, + // which the server treats as a begin (since there's no txId). But the gremlin is + // "g.tx().commit()" not "g.tx().begin()", so the server should route to commit + // handling which requires a txId. The exact behavior depends on the routing logic. + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().commit()\",\"g\":\"gtx\"}")) { + // without a transactionId, this should not succeed as a commit + final int status = response.getStatusLine().getStatusCode(); + // the server may treat this as a non-transactional request or reject it + // either way it should not be a successful commit of a transaction + assertEquals(400, status); + assertTrue(extractStatusMessage(response).contains("only allowed in transactional requests")); + } + + // rollback with no transactionId -- same logic + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"gtx\"}")) { + final int status = response.getStatusLine().getStatusCode(); + assertEquals(400, status); + assertTrue(extractStatusMessage(response).contains("only allowed in transactional requests")); + } + } + + /** + * #53: Begin on a graph that doesn't support transactions fails at begin time. + */ + @Test + public void shouldRejectBeginOnNonTransactionalGraph_53() throws Exception { + // gclassic is backed by TinkerGraph (non-transactional) + try (final CloseableHttpResponse response = postJson(client, + "{\"gremlin\":\"g.tx().begin()\",\"g\":\"gclassic\"}")) { + final int status = response.getStatusLine().getStatusCode(); + assertTrue("Expected error for non-transactional graph, got " + status, + status == 400 || status == 500); + } + } + + /** + * Combined test for #66 and #67: Full begin/addV/commit flow using GraphBinary and GraphSON + * serializers respectively, verifying the transactionId field round-trips correctly. + */ + @Test + public void shouldRoundTripTransactionIdWithGraphBinary_66() throws Exception { + final GraphBinaryMessageSerializerV4 serializer = new GraphBinaryMessageSerializerV4(); + + // begin via GraphBinary + final ByteBuf beginReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.tx().begin()").addG(GTX).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost beginPost = new HttpPost(TestClientFactory.createURLString()); + beginPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHBINARY_V4.getValue()); + beginPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHBINARY_V4.getValue()); + beginPost.setEntity(new ByteArrayEntity(beginReq.array())); + + String txId; + try (final CloseableHttpResponse response = client.execute(beginPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + final ResponseMessage rm = serializer.readChunk(toByteBuf(response.getEntity()), true); + final List<?> data = (List<?>) rm.getResult().getData(); + assertNotNull(data); + assertTrue(data.size() > 0); + // the data should contain a map with transactionId + final Map<?, ?> map = (java.util.Map<?, ?>) data.get(0); + txId = (String) map.get(Tokens.ARGS_TRANSACTION_ID); + assertNotNull(txId); + assertFalse(txId.isBlank()); + } + + // addV via GraphBinary + final ByteBuf addVReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.addV('binary_test')").addG(GTX).addTransactionId(txId).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost addVPost = new HttpPost(TestClientFactory.createURLString()); + addVPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHBINARY_V4.getValue()); + addVPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHBINARY_V4.getValue()); + addVPost.setEntity(new ByteArrayEntity(addVReq.array())); + try (final CloseableHttpResponse response = client.execute(addVPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // commit via GraphBinary + final ByteBuf commitReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost commitPost = new HttpPost(TestClientFactory.createURLString()); + commitPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHBINARY_V4.getValue()); + commitPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHBINARY_V4.getValue()); + commitPost.setEntity(new ByteArrayEntity(commitReq.array())); + try (final CloseableHttpResponse response = client.execute(commitPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // verify data persisted + try (final CloseableHttpResponse r = submitNonTx(client, "g.V().hasLabel('binary_test').count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(1, extractCount(r)); + } + } + + private static ByteBuf toByteBuf(final org.apache.http.HttpEntity httpEntity) throws java.io.IOException { + final byte[] asArray = EntityUtils.toByteArray(httpEntity); + return Unpooled.wrappedBuffer(asArray); + } + @Test + public void shouldRoundTripTransactionIdWithGraphSON_67() throws Exception { + final GraphSONMessageSerializerV4 serializer = new GraphSONMessageSerializerV4(); + + // begin via GraphSON + final ByteBuf beginReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.tx().begin()").addG(GTX).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost beginPost = new HttpPost(TestClientFactory.createURLString()); + beginPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHSON_V4.getValue()); + beginPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHSON_V4.getValue()); + beginPost.setEntity(new ByteArrayEntity(beginReq.array())); + + String txId; + try (final CloseableHttpResponse response = client.execute(beginPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Serializers.GRAPHSON_V4.getValue(), response.getEntity().getContentType().getValue()); + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + txId = node.get("result").get(TOKEN_DATA) + .get(GraphSONTokens.VALUEPROP).get(0) + .get(GraphSONTokens.VALUEPROP).get(1) + .asText(); + + assertNotNull(txId); + assertFalse(txId.isBlank()); + } + + // addV via GraphSON + final ByteBuf addVReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.addV('graphson_test')").addG(GTX).addTransactionId(txId).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost addVPost = new HttpPost(TestClientFactory.createURLString()); + addVPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHSON_V4.getValue()); + addVPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHSON_V4.getValue()); + addVPost.setEntity(new ByteArrayEntity(addVReq.array())); + try (final CloseableHttpResponse response = client.execute(addVPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // commit via GraphSON + final ByteBuf commitReq = serializer.serializeRequestAsBinary( + RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(), + new UnpooledByteBufAllocator(false)); + final HttpPost commitPost = new HttpPost(TestClientFactory.createURLString()); + commitPost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHSON_V4.getValue()); + commitPost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHSON_V4.getValue()); + commitPost.setEntity(new ByteArrayEntity(commitReq.array())); + try (final CloseableHttpResponse response = client.execute(commitPost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // verify data persisted + try (final CloseableHttpResponse r = submitNonTx(client, "g.V().hasLabel('graphson_test').count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(1, extractCount(r)); + } + } + + /** + * Test #42: Transaction ID appears in both the {@code X-Transaction-Id} response header + * and the {@code transactionId} field in the response body. Verifies the dual transmission + * contract across begin, submit, and commit operations. + * NOTE: changed as the server only returns the Id in the HEADER not the BODY + */ + @Test + public void shouldReturnTransactionIdHeader_42() throws Exception { + final String beginJson = "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + "\"}"; + String txIdFromBegin; + try (final CloseableHttpResponse response = postJson(client, beginJson)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + + // header must be present + assertNotNull(response.getFirstHeader("X-Transaction-Id")); + final String txIdFromHeader = response.getFirstHeader("X-Transaction-Id").getValue(); + assertNotNull(txIdFromHeader); + assertFalse(txIdFromHeader.isBlank()); + + // body must contain the same transaction ID + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + txIdFromBegin = node.get(TOKEN_RESULT).get(TOKEN_DATA) + .get(GraphSONTokens.VALUEPROP).get(0) + .get(GraphSONTokens.VALUEPROP).get(1) + .asText(); + assertNotNull(txIdFromBegin); + assertFalse(txIdFromBegin.isBlank()); + + assertEquals("Transaction ID in header and body must match on begin", + txIdFromHeader, txIdFromBegin); + } + + try (final CloseableHttpResponse response = submitInTx(client, txIdFromBegin, "g.addV('dual_test')", GTX)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Body doesn't contain the Transaction ID for Traversals in transactions. + assertEquals(txIdFromBegin, response.getFirstHeader("X-Transaction-Id").getValue()); + } + + try (final CloseableHttpResponse response = commitTx(client, txIdFromBegin, GTX)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + + final JsonNode jsonResponse = mapper.readTree(EntityUtils.toString(response.getEntity())); + final String txIdFromCommit = jsonResponse.get(TOKEN_RESULT).get(TOKEN_DATA) + .get(GraphSONTokens.VALUEPROP).get(0) + .get(GraphSONTokens.VALUEPROP).get(1) + .asText(); + assertEquals(txIdFromBegin, txIdFromCommit); + + assertEquals("Transaction ID in header must match on submit", + txIdFromBegin, response.getFirstHeader("X-Transaction-Id").getValue()); + } + } + + // TODO: move #39 over to this as its server side not client side test +} diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java index 4e83032df4..43cc6109d2 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java @@ -95,4 +95,11 @@ public final class Tokens { * identifying the kind of client it came from. */ public static final String ARGS_USER_AGENT = "userAgent"; + + /** + * Argument name for the transaction ID used to track multi-request transactions over HTTP. + * The transaction ID is a UUID generated by the client at transaction begin time and included + * in every request within the transaction. + */ + public static final String ARGS_TRANSACTION_ID = "transactionId"; } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java index 30072dd3bd..a64cffada6 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java @@ -40,6 +40,7 @@ public final class RequestMessage { this.fields = fields; this.fields.putIfAbsent(Tokens.ARGS_LANGUAGE, "gremlin-lang"); + this.fields.putIfAbsent(Tokens.ARGS_G, "g"); } /** @@ -165,6 +166,20 @@ public final class RequestMessage { return this; } + /** + * Adds a transaction ID to the request message. The transaction ID is used to track + * multi-request transactions over HTTP. All requests within a transaction must include + * the same transaction ID. + * + * @param transactionId the unique transaction identifier (typically a UUID) + * @return this builder + */ + public Builder addTransactionId(final String transactionId) { + Objects.requireNonNull(transactionId, "transactionId argument cannot be null."); + this.fields.put(Tokens.ARGS_TRANSACTION_ID, transactionId); + return this; + } + /** * Create the request message given the settings provided to the {@link Builder}. */ diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java index bc0212dfdf..caa3def020 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java @@ -338,6 +338,9 @@ public abstract class AbstractGraphSONMessageSerializerV4 extends AbstractMessag if (data.containsKey(Tokens.BULK_RESULTS)) { builder.addBulkResults(Boolean.parseBoolean(data.get(Tokens.BULK_RESULTS).toString())); } + if (data.containsKey(Tokens.ARGS_TRANSACTION_ID)) { + builder.addTransactionId(data.get(Tokens.ARGS_TRANSACTION_ID).toString()); + } return builder.create(); } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java index 4d37f40e33..9397e5afde 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java @@ -73,6 +73,9 @@ public class RequestMessageSerializer { if (fields.containsKey(Tokens.BULK_RESULTS)) { builder.addBulkResults(Boolean.parseBoolean(fields.get(Tokens.BULK_RESULTS).toString())); } + if (fields.containsKey(Tokens.ARGS_TRANSACTION_ID)) { + builder.addTransactionId(fields.get(Tokens.ARGS_TRANSACTION_ID).toString()); + } return builder.create(); } catch (IOException ex) {
