[ 
https://issues.apache.org/jira/browse/TINKERPOP-2245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319586#comment-17319586
 ] 

ASF GitHub Bot commented on TINKERPOP-2245:
-------------------------------------------

spmallette commented on a change in pull request #1414:
URL: https://github.com/apache/tinkerpop/pull/1414#discussion_r611800641



##########
File path: 
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Handler for websockets to be used with the {@link UnifiedChannelizer}.
+ */
+@ChannelHandler.Sharable
+public class UnifiedHandler extends 
SimpleChannelInboundHandler<RequestMessage> {
+    private static final Logger logger = 
LoggerFactory.getLogger(UnifiedHandler.class);
+
+    protected final Settings settings;
+    protected final GraphManager graphManager;
+    protected final GremlinExecutor gremlinExecutor;
+    protected final ScheduledExecutorService scheduledExecutorService;
+    protected final ExecutorService sessionExecutor;
+    protected final Channelizer channelizer;
+
+    protected final ConcurrentMap<String, Session> sessions = new 
ConcurrentHashMap<>();
+
+    /**
+     * This may or may not be the full set of invalid binding keys.  It is 
dependent on the static imports made to
+     * Gremlin Server.  This should get rid of the worst offenders though and 
provide a good message back to the
+     * calling client.
+     * <p/>
+     * Use of {@code toUpperCase()} on the accessor values of {@link T} solves 
an issue where the {@code ScriptEngine}
+     * ignores private scope on {@link T} and imports static fields.
+     */
+    protected static final Set<String> INVALID_BINDINGS_KEYS = new HashSet<>();
+
+    static {
+        INVALID_BINDINGS_KEYS.addAll(Arrays.asList(
+                T.id.name(), T.key.name(),
+                T.label.name(), T.value.name(),
+                T.id.getAccessor(), T.key.getAccessor(),
+                T.label.getAccessor(), T.value.getAccessor(),
+                T.id.getAccessor().toUpperCase(), 
T.key.getAccessor().toUpperCase(),
+                T.label.getAccessor().toUpperCase(), 
T.value.getAccessor().toUpperCase()));
+
+        for (Column enumItem : Column.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Order enumItem : Order.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Operator enumItem : Operator.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Scope enumItem : Scope.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+
+        for (Pop enumItem : Pop.values()) {
+            INVALID_BINDINGS_KEYS.add(enumItem.name());
+        }
+    }
+
+    public UnifiedHandler(final Settings settings, final GraphManager 
graphManager,
+                          final GremlinExecutor gremlinExecutor,
+                          final ScheduledExecutorService 
scheduledExecutorService,
+                          final Channelizer channelizer) {
+        this.settings = settings;
+        this.graphManager = graphManager;
+        this.gremlinExecutor = gremlinExecutor;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.channelizer = channelizer;
+        this.sessionExecutor = gremlinExecutor.getExecutorService();
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final 
RequestMessage msg) throws Exception {
+        try {
+            try {
+                validateRequest(msg, graphManager);
+            } catch (SessionException we) {
+                ctx.writeAndFlush(we.getResponseMessage());
+                return;
+            }
+
+            final Optional<String> optMultiTaskSession = 
msg.optionalArgs(Tokens.ARGS_SESSION);
+            final String sessionId = 
optMultiTaskSession.orElse(UUID.randomUUID().toString());
+
+            // the SessionTask is really a Context from OpProcessor. we still 
need the GremlinExecutor/ScriptEngine
+            // config that is all rigged up into the server nicely right now 
so it seemed best to just keep the general
+            // Context object but extend (essentially rename) it to 
SessionTask so that it better fits the nomenclature
+            // we have here. when we drop OpProcessor stuff and rid ourselves 
of GremlinExecutor then we can probably
+            // pare down the constructor for SessionTask further.
+            final SessionTask sessionTask = new SessionTask(msg, ctx, 
settings, graphManager,
+                    gremlinExecutor, scheduledExecutorService);
+
+            if (sessions.containsKey(sessionId)) {
+                final Session session = sessions.get(sessionId);
+
+                // check if the session is bound to this channel, thus one 
client per session
+                if (!session.isBoundTo(ctx.channel())) {
+                    final String sessionClosedMessage = String.format("Session 
%s is not bound to the connecting client", sessionId);
+                    final ResponseMessage response = 
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                    return;
+                }
+
+                // if the session is done accepting tasks then error time
+                if (!session.submitTask(sessionTask)) {
+                    final String sessionClosedMessage = String.format(
+                            "Session %s is no longer accepting requests as it 
has been closed", sessionId);
+                    final ResponseMessage response = 
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(sessionClosedMessage).create();
+                    ctx.writeAndFlush(response);
+                }
+            } else {
+                // determine the type of session to start - one that processes 
the current request only and close OR
+                // one that will process this current request and ones that 
may arrive in the future.
+                final Session session = optMultiTaskSession.isPresent() ?
+                        createMultiTaskSession(sessionTask, sessionId) :
+                        createSingleTaskSession(sessionTask, sessionId);
+
+                // queue the session to startup when a thread is ready to take 
it
+                final Future<?> sessionFuture = 
sessionExecutor.submit(session);
+                session.setSessionFuture(sessionFuture);
+                sessions.put(sessionId, session);
+
+                // determine the max session life. for multi that's going to 
be "session life" and for single that
+                // will be the span of the request timeout
+                final long seto = sessionTask.getRequestTimeout();
+                final long sessionLife = optMultiTaskSession.isPresent() ? 
settings.sessionLifetimeTimeout : seto;
+
+                // if timeout is enabled when greater than zero schedule up a 
timeout which is a session life timeout
+                // for a multi or technically a request timeout for a single.
+                if (seto > 0) {
+                    final ScheduledFuture<?> sessionCancelFuture =
+                            scheduledExecutorService.schedule(
+                                    () -> session.triggerTimeout(sessionLife, 
optMultiTaskSession.isPresent()),
+                                    sessionLife, TimeUnit.MILLISECONDS);
+                    session.setSessionCancelFuture(sessionCancelFuture);
+                }
+            }
+        } catch (RejectedExecutionException ree) {
+            logger.warn(ree.getMessage());
+
+            // generic message seems ok here? like, you would know what you 
were submitting on, i.e. session or
+            // sessionless, when you got this error. probably don't need gory 
details.
+            final ResponseMessage response = 
ResponseMessage.build(msg).code(ResponseStatusCode.TOO_MANY_REQUESTS)
+                    .statusMessage("Rate limiting").create();
+            ctx.writeAndFlush(response);
+        } finally {
+            ReferenceCountUtil.release(msg);

Review comment:
       hmm - i think i just copy/pasted from `ExecutorHandler` there. is there 
a need to `retain()`? perhaps i can just remove the `finally` all together?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consolidate the executor for bytecode & string based client
> -----------------------------------------------------------
>
>                 Key: TINKERPOP-2245
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-2245
>             Project: TinkerPop
>          Issue Type: Improvement
>          Components: server
>    Affects Versions: 3.4.2
>            Reporter: Divij Vaidya
>            Assignee: Stephen Mallette
>            Priority: Minor
>
> We have two code paths in the server which perform (more or less) the same 
> functions. One is the executor for string based queries and other is the 
> executor for bytecode. This code can be refactored together so that the logic 
> to handle timeout, handle exception during execution, handle exception before 
> execution and others can consolidated. 
> [https://github.com/apache/tinkerpop/blob/master/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java#L246]
> and
> [https://github.com/apache/tinkerpop/blob/master/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java#L333]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to