http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index e2fa270,841e980..89ac0bb --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1941,201 -1711,252 +1941,201 @@@ public class StorageProxy implements St } } - public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadFailureException, ReadTimeoutException + private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); + private final ReadCallback handler; + private PartitionIterator result; - Keyspace keyspace = Keyspace.open(command.keyspace); - List<Row> rows; - // now scan until we have enough results - try + private SingleRangeResponse(ReadCallback handler) { - int liveRowCount = 0; - boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); - rows = new ArrayList<>(); + this.handler = handler; + } - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List<? extends AbstractBounds<RowPosition>> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); - - // determine the number of rows to be fetched and the concurrency factor - int rowsToBeFetched = command.limit(); - int concurrencyFactor; - if (command.requiresScanningAllRanges()) - { - // all nodes must be queried - rowsToBeFetched *= ranges.size(); - concurrencyFactor = ranges.size(); - logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", - ranges.size(), concurrencyFactor); + private void waitForResponse() throws ReadTimeoutException + { + if (result != null) + return; + + try + { + result = handler.get(); } - else + catch (DigestMismatchException e) { - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - - logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", - ranges.size(), - concurrencyFactor, - resultRowsPerRange); - } - - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds<RowPosition> nextRange = null; - List<InetAddress> nextEndpoints = null; - List<InetAddress> nextFilteredEndpoints = null; - while (i < ranges.size()) - { - List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) - { - AbstractBounds<RowPosition> range = nextRange == null - ? ranges.get(i) - : nextRange; - List<InetAddress> liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } + throw new AssertionError(e); // no digests in range slices yet + } + } - AbstractRangeCommand nodeCmd = command.forSubRange(range); + protected RowIterator computeNext() + { + waitForResponse(); + return result.hasNext() ? result.next() : endOfData(); + } - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); - } - else - { - MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); - } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); + public void close() + { + if (result != null) + result.close(); + } + } - List<AsyncOneResponse> repairResponses = new ArrayList<>(); - for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) - { - ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; + private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final Iterator<RangeForQuery> ranges; + private final int totalRangeCount; + private final PartitionRangeReadCommand command; + private final Keyspace keyspace; + private final ConsistencyLevel consistency; - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (countLiveRows) - liveRowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - // we timed out or failed waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size()); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + private final long startTime; + private DataLimits.Counter counter; + private PartitionIterator sentQueryIterator; - // if we're done, great, otherwise, move to the next range - int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= rowsToBeFetched) - { - haveSufficientRows = true; - break; - } - } + private int concurrencyFactor; + // The two following "metric" are maintained to improve the concurrencyFactor + // when it was not good enough initially. + private int liveReturned; + private int rangesQueried; - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + { + this.command = command; + this.concurrencyFactor = concurrencyFactor; + this.startTime = System.nanoTime(); + this.ranges = new RangeMerger(ranges, keyspace, consistency); + this.totalRangeCount = ranges.rangeCount(); + this.consistency = consistency; + this.keyspace = keyspace; + } + + public RowIterator computeNext() + { + while (sentQueryIterator == null || !sentQueryIterator.hasNext()) + { + // If we don't have more range to handle, we're done + if (!ranges.hasNext()) + return endOfData(); + + // else, sends the next batch of concurrent queries (after having close the previous iterator) + if (sentQueryIterator != null) { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); + liveReturned += counter.counted(); + sentQueryIterator.close(); + + // It's not the first batch of queries and we're not done, so we we can use what has been + // returned so far to improve our rows-per-range estimate and update the concurrency accordingly + updateConcurrencyFactor(); } + sentQueryIterator = sendNextRequests(); + } + + return sentQueryIterator.next(); + } + + private void updateConcurrencyFactor() + { + if (liveReturned == 0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + concurrencyFactor = totalRangeCount - rangesQueried; + return; + } - if (haveSufficientRows) - return command.postReconciliationProcessing(rows); + // Otherwise, compute how many rows per range we got on average and pick a concurrency factor + // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. + int remainingRows = command.limits().count() - liveReturned; + float rowsPerRange = (float)liveReturned / (float)rangesQueried; + concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); + logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + rowsPerRange, (int) remainingRows, concurrencyFactor); + } + + private SingleRangeResponse query(RangeForQuery toQuery) + { + PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range); + + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); + + int blockFor = consistency.blockFor(keyspace); + int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); + List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) + handler.assureSufficientLiveNodes(); + + if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + { - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get()); ++ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); + } + else + { + for (InetAddress endpoint : toQuery.filteredEndpoints) { - float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = rowsToBeFetched - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else - { - actualRowsPerRange = fetchedRows / i; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); - } - logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); + MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); + Tracing.trace("Enqueuing request to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } } + + return new SingleRangeResponse(handler); } - finally + + private PartitionIterator sendNextRequests() { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + { + concurrentQueries.add(query(ranges.next())); + ++rangesQueried; + } + + Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); + // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to + // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. + counter = DataLimits.NONE.newCounter(command.nowInSec(), true); + return counter.applyTo(PartitionIterators.concat(concurrentQueries)); } - return command.postReconciliationProcessing(rows); + + public void close() + { + try + { + if (sentQueryIterator != null) + sentQueryIterator.close(); + } + finally + { + long latency = System.nanoTime() - startTime; + rangeMetrics.addNano(latency); + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + } + } + } + + @SuppressWarnings("resource") + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) + throws UnavailableException, ReadFailureException, ReadTimeoutException + { + Tracing.trace("Computing ranges to query"); + + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); + + // our estimate of how many result rows there will be per-range + float resultsPerRange = estimateResultsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrencyFactor = resultsPerRange == 0.0 + ? 1 + : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); + logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); + + // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. + + return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec()); } public Map<String, List<String>> getSchemaVersions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index 704a6c9,bcfe871..6d54e36 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@@ -2263,226 -2686,4 +2263,226 @@@ public class UFTest extends CQLTeste "AS 'return 0;'"); } } + + @Test + public void testSecurityPermissions() throws Throwable + { + createTable("CREATE TABLE %s (key int primary key, dval double)"); + execute("INSERT INTO %s (key, dval) VALUES (?, ?)", 1, 1d); + + // Java UDFs + + try + { + String fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE JAVA\n" + + "AS 'System.getProperty(\"foo.bar.baz\"); return 0d;';"); + execute("SELECT " + fName + "(dval) FROM %s WHERE key=1"); + Assert.fail(); + } + catch (FunctionExecutionException e) + { + assertAccessControlException("System.getProperty(\"foo.bar.baz\"); return 0d;", e); + } + + String[][] typesAndSources = + { + {"", "try { Class.forName(\"" + UDHelper.class.getName() + "\"); } catch (Exception e) { throw new RuntimeException(e); } return 0d;"}, + {"sun.misc.Unsafe", "sun.misc.Unsafe.getUnsafe(); return 0d;"}, + {"", "try { Class.forName(\"sun.misc.Unsafe\"); } catch (Exception e) { throw new RuntimeException(e); } return 0d;"}, + {"java.nio.file.FileSystems", "try {" + + " java.nio.file.FileSystems.getDefault(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.nio.channels.FileChannel", "try {" + + " java.nio.channels.FileChannel.open(java.nio.file.FileSystems.getDefault().getPath(\"/etc/passwd\")).close(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.nio.channels.SocketChannel", "try {" + + " java.nio.channels.SocketChannel.open().close(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.io.FileInputStream", "try {" + + " new java.io.FileInputStream(\"./foobar\").close(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.lang.Runtime", "try {" + + " java.lang.Runtime.getRuntime(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"org.apache.cassandra.service.StorageService", + "try {" + + " org.apache.cassandra.service.StorageService v = org.apache.cassandra.service.StorageService.instance; v.isInShutdownHook(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.net.ServerSocket", "try {" + + " new java.net.ServerSocket().bind(); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.io.FileOutputStream","try {" + + " new java.io.FileOutputStream(\".foo\"); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'}, + {"java.lang.Runtime", "try {" + + " java.lang.Runtime.getRuntime().exec(\"/tmp/foo\"); return 0d;" + + "} catch (Exception t) {" + + " throw new RuntimeException(t);" + + '}'} + }; + + for (String[] typeAndSource : typesAndSources) + { + assertInvalidMessage(typeAndSource[0] + " cannot be resolved", + "CREATE OR REPLACE FUNCTION " + KEYSPACE + ".invalid_class_access(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE JAVA\n" + + "AS '" + typeAndSource[1] + "';"); + } + + // JavaScript UDFs + + try + { + String fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE javascript\n" + + "AS 'org.apache.cassandra.service.StorageService.instance.isInShutdownHook(); 0;';"); + execute("SELECT " + fName + "(dval) FROM %s WHERE key=1"); + Assert.fail("Javascript security check failed"); + } + catch (FunctionExecutionException e) + { + assertAccessControlException("", e); + } + + String[] javascript = + { + "java.lang.management.ManagmentFactory.getThreadMXBean(); 0;", + "new java.io.FileInputStream(\"/tmp/foo\"); 0;", + "new java.io.FileOutputStream(\"/tmp/foo\"); 0;", + "java.nio.file.FileSystems.getDefault().createFileExclusively(\"./foo_bar_baz\"); 0;", + "java.nio.channels.FileChannel.open(java.nio.file.FileSystems.getDefault().getPath(\"/etc/passwd\")); 0;", + "java.nio.channels.SocketChannel.open(); 0;", + "new java.net.ServerSocket().bind(null); 0;", + "var thread = new java.lang.Thread(); thread.start(); 0;", + "java.lang.System.getProperty(\"foo.bar.baz\"); 0;", + "java.lang.Class.forName(\"java.lang.System\"); 0;", + "java.lang.Runtime.getRuntime().exec(\"/tmp/foo\"); 0;", + "java.lang.Runtime.getRuntime().loadLibrary(\"foobar\"); 0;", + "java.lang.Runtime.getRuntime().loadLibrary(\"foobar\"); 0;", + // TODO these (ugly) calls are still possible - these can consume CPU (as one could do with an evil loop, too) +// "java.lang.Runtime.getRuntime().traceMethodCalls(true); 0;", +// "java.lang.Runtime.getRuntime().gc(); 0;", +// "java.lang.Runtime.getRuntime(); 0;", + }; + + for (String script : javascript) + { + try + { + String fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE javascript\n" + + "AS '" + script + "';"); + execute("SELECT " + fName + "(dval) FROM %s WHERE key=1"); + Assert.fail("Javascript security check failed: " + script); + } + catch (FunctionExecutionException e) + { + assertAccessControlException(script, e); + } + } + } + + private static void assertAccessControlException(String script, FunctionExecutionException e) + { + for (Throwable t = e; t != null && t != t.getCause(); t = t.getCause()) + if (t instanceof AccessControlException) + return; + Assert.fail("no AccessControlException for " + script + " (got " + e + ')'); + } + + @Test + public void testAmokUDF() throws Throwable + { + createTable("CREATE TABLE %s (key int primary key, dval double)"); + execute("INSERT INTO %s (key, dval) VALUES (?, ?)", 1, 1d); + + long udfWarnTimeout = DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(); + long udfFailTimeout = DatabaseDescriptor.getUserDefinedFunctionFailTimeout(); + int maxTries = 5; + for (int i = 1; i <= maxTries; i++) + { + try + { + // short timeout + DatabaseDescriptor.setUserDefinedFunctionWarnTimeout(10); + DatabaseDescriptor.setUserDefinedFunctionFailTimeout(250); + // don't kill the unit test... - default policy is "die" + DatabaseDescriptor.setUserFunctionTimeoutPolicy(Config.UserFunctionTimeoutPolicy.ignore); + - ClientWarn.captureWarnings(); ++ ClientWarn.instance.captureWarnings(); + String fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE JAVA\n" + + "AS 'long t=System.currentTimeMillis()+110; while (t>System.currentTimeMillis()) { }; return 0d;'"); + execute("SELECT " + fName + "(dval) FROM %s WHERE key=1"); - List<String> warnings = ClientWarn.getWarnings(); ++ List<String> warnings = ClientWarn.instance.getWarnings(); + Assert.assertNotNull(warnings); + Assert.assertFalse(warnings.isEmpty()); - ClientWarn.resetWarnings(); ++ ClientWarn.instance.resetWarnings(); + + // Java UDF + + fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE JAVA\n" + + "AS 'long t=System.currentTimeMillis()+500; while (t>System.currentTimeMillis()) { }; return 0d;';"); + assertInvalidMessage("ran longer than 250ms", "SELECT " + fName + "(dval) FROM %s WHERE key=1"); + + // Javascript UDF + + fName = createFunction(KEYSPACE_PER_TEST, "double", + "CREATE OR REPLACE FUNCTION %s(val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE JAVASCRIPT\n" + + "AS 'var t=java.lang.System.currentTimeMillis()+500; while (t>java.lang.System.currentTimeMillis()) { }; 0;';"); + assertInvalidMessage("ran longer than 250ms", "SELECT " + fName + "(dval) FROM %s WHERE key=1"); + + return; + } + catch (Error | RuntimeException e) + { + if (i == maxTries) + throw e; + } + finally + { + // reset to defaults + DatabaseDescriptor.setUserDefinedFunctionWarnTimeout(udfWarnTimeout); + DatabaseDescriptor.setUserDefinedFunctionFailTimeout(udfFailTimeout); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/test/unit/org/apache/cassandra/service/ClientWarningsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ClientWarningsTest.java index c3f2629,d22a8f6..cf14d55 --- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@@ -23,8 -22,11 +23,10 @@@ import org.junit.BeforeClass import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryOptions; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.SimpleClient; @@@ -76,6 -80,62 +78,62 @@@ public class ClientWarningsTest extend } } + @Test + public void testTombstoneWarning() throws Exception + { + final int iterations = 10000; + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4)) + { + client.connect(false); + + for (int i = 0; i < iterations; i++) + { + QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT); + client.execute(query); + } + ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); + store.forceBlockingFlush(); + + for (int i = 0; i < iterations; i++) + { + QueryMessage query = new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT); + client.execute(query); + } + store.forceBlockingFlush(); + + { + QueryMessage query = new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1", + KEYSPACE, + currentTable()), QueryOptions.DEFAULT); + Message.Response resp = client.execute(query); + assertEquals(1, resp.getWarnings().size()); + } + } + } + + @Test + public void testLargeBatchWithProtoV2() throws Exception + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); + - try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2)) ++ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_3)) + { + client.connect(false); + + QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); + Message.Response resp = client.execute(query); + assertNull(resp.getWarnings()); + } + } + private String createBatchStatement(int minSize) { return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",