Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 673b1eb59 -> 269f0785f refs/heads/trunk 370e5269b -> 10e9c193b
Add custom JMX enabled executor for UDFs Patch and review by Robert Stupp and Sam Tunnicliffe for CASSANDRA-10026 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/269f0785 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/269f0785 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/269f0785 Branch: refs/heads/cassandra-3.0 Commit: 269f0785f2b1544bca561f19e0b6f2bde1150395 Parents: 673b1eb Author: Sam Tunnicliffe <s...@beobal.com> Authored: Sun Aug 9 11:16:24 2015 +0100 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Mon Aug 10 20:38:47 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../concurrent/NamedThreadFactory.java | 2 +- .../cql3/functions/JavaBasedUDFunction.java | 40 ++++++--------- .../cql3/functions/ScriptBasedUDFunction.java | 44 +++++----------- .../cql3/functions/UDFExecutorService.java | 53 ++++++++++++++++++++ 5 files changed, 81 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f0785/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5d3e0d3..7eff824 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta1 + * Add custom JMX enabled executor for UDF sandbox (CASSANDRA-10026) * Fix row deletion bug for Materialized Views (CASSANDRA-10014) * Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704) * Fix multiple slices on RowSearchers (CASSANDRA-10002) http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f0785/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 20570c4..33c80d5 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { - protected final String id; + public final String id; private final int priority; private final ClassLoader contextClassLoader; private final ThreadGroup threadGroup; http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f0785/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java index 3d8ebd9..126160d 100644 --- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java @@ -25,35 +25,21 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.lang.reflect.InvocationTargetException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLStreamHandler; +import java.net.*; import java.nio.ByteBuffer; -import java.security.CodeSource; -import java.security.PermissionCollection; -import java.security.ProtectionDomain; -import java.security.SecureClassLoader; +import java.security.*; import java.security.cert.Certificate; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.concurrent.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.io.ByteStreams; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.driver.core.DataType; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.marshal.AbstractType; @@ -78,12 +64,14 @@ final class JavaBasedUDFunction extends UDFunction private static final AtomicInteger classSequence = new AtomicInteger(); - private static final JMXEnabledThreadPoolExecutor executor = - new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedFunctions", - Thread.MIN_PRIORITY, - udfClassLoader, - new SecurityThreadGroup("UserDefinedFunctions", null)), - "userfunction"); + // use a JVM standard ExecutorService as DebuggableThreadPoolExecutor references internal + // classes, which triggers AccessControlException from the UDF sandbox + private static final UDFExecutorService executor = + new UDFExecutorService(new NamedThreadFactory("UserDefinedFunctions", + Thread.MIN_PRIORITY, + udfClassLoader, + new SecurityThreadGroup("UserDefinedFunctions", null)), + "userfunction"); private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f0785/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java index 8b448fe..c4faa72 100644 --- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java @@ -19,36 +19,14 @@ package org.apache.cassandra.cql3.functions; import java.math.BigDecimal; import java.math.BigInteger; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLStreamHandler; +import java.net.*; import java.nio.ByteBuffer; -import java.security.AccessControlContext; -import java.security.AccessController; -import java.security.CodeSource; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.security.ProtectionDomain; +import java.security.*; import java.security.cert.Certificate; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; -import javax.script.Bindings; -import javax.script.Compilable; -import javax.script.CompiledScript; -import javax.script.ScriptContext; -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; -import javax.script.SimpleScriptContext; +import javax.script.*; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.marshal.AbstractType; @@ -100,12 +78,14 @@ final class ScriptBasedUDFunction extends UDFunction "com.datastax.driver.core.utils" }; - private static final JMXEnabledThreadPoolExecutor executor = - new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedScriptFunctions", - Thread.MIN_PRIORITY, - udfClassLoader, - new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))), - "userscripts"); + // use a JVM standard ExecutorService as DebuggableThreadPoolExecutor references internal + // classes, which triggers AccessControlException from the UDF sandbox + private static final UDFExecutorService executor = + new UDFExecutorService(new NamedThreadFactory("UserDefinedScriptFunctions", + Thread.MIN_PRIORITY, + udfClassLoader, + new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))), + "userscripts"); static { http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f0785/src/java/org/apache/cassandra/cql3/functions/UDFExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFExecutorService.java b/src/java/org/apache/cassandra/cql3/functions/UDFExecutorService.java new file mode 100644 index 0000000..5e08ad8 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/functions/UDFExecutorService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.functions; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Executor service which exposes stats via JMX, but which doesn't reference + * internal classes in its beforeExecute & afterExecute methods as these are + * forbidden by the UDF execution sandbox + */ +final class UDFExecutorService extends JMXEnabledThreadPoolExecutor +{ + private static int KEEPALIVE = Integer.getInteger("cassandra.udf_executor_thread_keepalive_ms", 30000); + + UDFExecutorService(NamedThreadFactory threadFactory, String jmxPath) + { + super(FBUtilities.getAvailableProcessors(), + KEEPALIVE, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + threadFactory, + jmxPath); + } + + protected void afterExecute(Runnable r, Throwable t) + { + } + + protected void beforeExecute(Thread t, Runnable r) + { + } +}