http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index e2fa270,841e980..89ac0bb
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1941,201 -1711,252 +1941,201 @@@ public class StorageProxy implements St
          }
      }
  
 -    public static List<Row> getRangeSlice(AbstractRangeCommand command, 
ConsistencyLevel consistency_level)
 -    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    private static class SingleRangeResponse extends 
AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        Tracing.trace("Computing ranges to query");
 -        long startTime = System.nanoTime();
 +        private final ReadCallback handler;
 +        private PartitionIterator result;
  
 -        Keyspace keyspace = Keyspace.open(command.keyspace);
 -        List<Row> rows;
 -        // now scan until we have enough results
 -        try
 +        private SingleRangeResponse(ReadCallback handler)
          {
 -            int liveRowCount = 0;
 -            boolean countLiveRows = command.countCQL3Rows() || 
command.ignoredTombstonedPartitions();
 -            rows = new ArrayList<>();
 +            this.handler = handler;
 +        }
  
 -            // when dealing with LocalStrategy keyspaces, we can skip the 
range splitting and merging (which can be
 -            // expensive in clusters with vnodes)
 -            List<? extends AbstractBounds<RowPosition>> ranges;
 -            if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
 -                ranges = command.keyRange.unwrap();
 -            else
 -                ranges = getRestrictedRanges(command.keyRange);
 -
 -            // determine the number of rows to be fetched and the concurrency 
factor
 -            int rowsToBeFetched = command.limit();
 -            int concurrencyFactor;
 -            if (command.requiresScanningAllRanges())
 -            {
 -                // all nodes must be queried
 -                rowsToBeFetched *= ranges.size();
 -                concurrencyFactor = ranges.size();
 -                logger.debug("Requested rows: {}, ranges.size(): {}; 
concurrent range requests: {}",
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {}",
 -                              ranges.size(), concurrencyFactor);
 +        private void waitForResponse() throws ReadTimeoutException
 +        {
 +            if (result != null)
 +                return;
 +
 +            try
 +            {
 +                result = handler.get();
              }
 -            else
 +            catch (DigestMismatchException e)
              {
 -                // our estimate of how many result rows there will be 
per-range
 -                float resultRowsPerRange = 
estimateResultRowsPerRange(command, keyspace);
 -                // underestimate how many rows we will get per-range in order 
to increase the likelihood that we'll
 -                // fetch enough rows in the first round
 -                resultRowsPerRange -= resultRowsPerRange * 
CONCURRENT_SUBREQUESTS_MARGIN;
 -                concurrencyFactor = resultRowsPerRange == 0.0
 -                                  ? 1
 -                                  : Math.max(1, Math.min(ranges.size(), (int) 
Math.ceil(command.limit() / resultRowsPerRange)));
 -
 -                logger.trace("Estimated result rows per range: {}; requested 
rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             resultRowsPerRange,
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {} ({} rows per range expected)",
 -                              ranges.size(),
 -                              concurrencyFactor,
 -                              resultRowsPerRange);
 -            }
 -
 -            boolean haveSufficientRows = false;
 -            int i = 0;
 -            AbstractBounds<RowPosition> nextRange = null;
 -            List<InetAddress> nextEndpoints = null;
 -            List<InetAddress> nextFilteredEndpoints = null;
 -            while (i < ranges.size())
 -            {
 -                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, 
Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
 -                int concurrentFetchStartingIndex = i;
 -                int concurrentRequests = 0;
 -                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
 -                {
 -                    AbstractBounds<RowPosition> range = nextRange == null
 -                                                      ? ranges.get(i)
 -                                                      : nextRange;
 -                    List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                    ? 
getLiveSortedEndpoints(keyspace, range.right)
 -                                                    : nextEndpoints;
 -                    List<InetAddress> filteredEndpoints = 
nextFilteredEndpoints == null
 -                                                        ? 
consistency_level.filterForQuery(keyspace, liveEndpoints)
 -                                                        : 
nextFilteredEndpoints;
 -                    ++i;
 -                    ++concurrentRequests;
 -
 -                    // getRestrictedRange has broken the queried range into 
per-[vnode] token ranges, but this doesn't take
 -                    // the replication factor into account. If the 
intersection of live endpoints for 2 consecutive ranges
 -                    // still meets the CL requirements, then we can merge 
both ranges into the same RangeSliceCommand.
 -                    while (i < ranges.size())
 -                    {
 -                        nextRange = ranges.get(i);
 -                        nextEndpoints = getLiveSortedEndpoints(keyspace, 
nextRange.right);
 -                        nextFilteredEndpoints = 
consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                        // If the current range right is the min token, we 
should stop merging because CFS.getRangeSlice
 -                        // don't know how to deal with a wrapping range.
 -                        // Note: it would be slightly more efficient to have 
CFS.getRangeSlice on the destination nodes unwraps
 -                        // the range if necessary and deal with it. However, 
we can't start sending wrapped range without breaking
 -                        // wire compatibility, so It's likely easier not to 
bother;
 -                        if (range.right.isMinimum())
 -                            break;
 -
 -                        List<InetAddress> merged = 
intersection(liveEndpoints, nextEndpoints);
 -
 -                        // Check if there is enough endpoint for the merge to 
be possible.
 -                        if 
(!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                            break;
 -
 -                        List<InetAddress> filteredMerged = 
consistency_level.filterForQuery(keyspace, merged);
 -
 -                        // Estimate whether merging will be a win or not
 -                        if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
 filteredEndpoints, nextFilteredEndpoints))
 -                            break;
 -
 -                        // If we get there, merge this range and the next one
 -                        range = range.withNewRight(nextRange.right);
 -                        liveEndpoints = merged;
 -                        filteredEndpoints = filteredMerged;
 -                        ++i;
 -                    }
 +                throw new AssertionError(e); // no digests in range slices yet
 +            }
 +        }
  
 -                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +        protected RowIterator computeNext()
 +        {
 +            waitForResponse();
 +            return result.hasNext() ? result.next() : endOfData();
 +        }
  
 -                    // collect replies and resolve according to consistency 
level
 -                    RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 -                    List<InetAddress> minimalEndpoints = 
filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), 
consistency_level.blockFor(keyspace)));
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = 
new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 -                    handler.assureSufficientLiveNodes();
 -                    resolver.setSources(filteredEndpoints);
 -                    if (filteredEndpoints.size() == 1
 -                        && 
filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new 
LocalRangeSliceRunnable(nodeCmd, handler));
 -                    }
 -                    else
 -                    {
 -                        MessageOut<? extends AbstractRangeCommand> message = 
nodeCmd.createMessage();
 -                        for (InetAddress endpoint : filteredEndpoints)
 -                        {
 -                            Tracing.trace("Enqueuing request to {}", 
endpoint);
 -                            
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 -                        }
 -                    }
 -                    scanHandlers.add(Pair.create(nodeCmd, handler));
 -                }
 -                Tracing.trace("Submitted {} concurrent range requests 
covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
 +        public void close()
 +        {
 +            if (result != null)
 +                result.close();
 +        }
 +    }
  
 -                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 -                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, 
Iterable<Row>>> cmdPairHandler : scanHandlers)
 -                {
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = 
cmdPairHandler.right;
 -                    RangeSliceResponseResolver resolver = 
(RangeSliceResponseResolver)handler.resolver;
 +    private static class RangeCommandIterator extends 
AbstractIterator<RowIterator> implements PartitionIterator
 +    {
 +        private final Iterator<RangeForQuery> ranges;
 +        private final int totalRangeCount;
 +        private final PartitionRangeReadCommand command;
 +        private final Keyspace keyspace;
 +        private final ConsistencyLevel consistency;
  
 -                    try
 -                    {
 -                        for (Row row : handler.get())
 -                        {
 -                            rows.add(row);
 -                            if (countLiveRows)
 -                                liveRowCount += 
row.getLiveCount(command.predicate, command.timestamp);
 -                        }
 -                        repairResponses.addAll(resolver.repairResults);
 -                    }
 -                    catch (ReadTimeoutException|ReadFailureException ex)
 -                    {
 -                        // we timed out or failed waiting for responses
 -                        int blockFor = consistency_level.blockFor(keyspace);
 -                        int responseCount = resolver.responses.size();
 -                        String gotData = responseCount > 0
 -                                         ? resolver.isDataPresent() ? " 
(including data)" : " (only digests)"
 -                                         : "";
 -
 -                        boolean isTimeout = ex instanceof 
ReadTimeoutException;
 -                        if (Tracing.isTracing())
 -                        {
 -                            Tracing.trace("{}; received {} of {} responses{} 
for range {} of {}",
 -                                          (isTimeout ? "Timed out" : 
"Failed"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        else if (logger.isDebugEnabled())
 -                        {
 -                            logger.debug("Range slice {}; received {} of {} 
responses{} for range {} of {}",
 -                                         (isTimeout ? "timeout" : "failure"), 
responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        throw ex;
 -                    }
 -                    catch (DigestMismatchException e)
 -                    {
 -                        throw new AssertionError(e); // no digests in range 
slices yet
 -                    }
 +        private final long startTime;
 +        private DataLimits.Counter counter;
 +        private PartitionIterator sentQueryIterator;
  
 -                    // if we're done, great, otherwise, move to the next range
 -                    int count = countLiveRows ? liveRowCount : rows.size();
 -                    if (count >= rowsToBeFetched)
 -                    {
 -                        haveSufficientRows = true;
 -                        break;
 -                    }
 -                }
 +        private int concurrencyFactor;
 +        // The two following "metric" are maintained to improve the 
concurrencyFactor
 +        // when it was not good enough initially.
 +        private int liveReturned;
 +        private int rangesQueried;
  
 -                try
 -                {
 -                    FBUtilities.waitOnFutures(repairResponses, 
DatabaseDescriptor.getWriteRpcTimeout());
 -                }
 -                catch (TimeoutException ex)
 +        public RangeCommandIterator(RangeIterator ranges, 
PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, 
ConsistencyLevel consistency)
 +        {
 +            this.command = command;
 +            this.concurrencyFactor = concurrencyFactor;
 +            this.startTime = System.nanoTime();
 +            this.ranges = new RangeMerger(ranges, keyspace, consistency);
 +            this.totalRangeCount = ranges.rangeCount();
 +            this.consistency = consistency;
 +            this.keyspace = keyspace;
 +        }
 +
 +        public RowIterator computeNext()
 +        {
 +            while (sentQueryIterator == null || !sentQueryIterator.hasNext())
 +            {
 +                // If we don't have more range to handle, we're done
 +                if (!ranges.hasNext())
 +                    return endOfData();
 +
 +                // else, sends the next batch of concurrent queries (after 
having close the previous iterator)
 +                if (sentQueryIterator != null)
                  {
 -                    // We got all responses, but timed out while repairing
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    if (Tracing.isTracing())
 -                        Tracing.trace("Timed out while read-repairing after 
receiving all {} data and digest responses", blockFor);
 -                    else
 -                        logger.debug("Range slice timeout while 
read-repairing after receiving all {} data and digest responses", blockFor);
 -                    throw new ReadTimeoutException(consistency_level, 
blockFor-1, blockFor, true);
 +                    liveReturned += counter.counted();
 +                    sentQueryIterator.close();
 +
 +                    // It's not the first batch of queries and we're not 
done, so we we can use what has been
 +                    // returned so far to improve our rows-per-range estimate 
and update the concurrency accordingly
 +                    updateConcurrencyFactor();
                  }
 +                sentQueryIterator = sendNextRequests();
 +            }
 +
 +            return sentQueryIterator.next();
 +        }
 +
 +        private void updateConcurrencyFactor()
 +        {
 +            if (liveReturned == 0)
 +            {
 +                // we haven't actually gotten any results, so query all 
remaining ranges at once
 +                concurrencyFactor = totalRangeCount - rangesQueried;
 +                return;
 +            }
  
 -                if (haveSufficientRows)
 -                    return command.postReconciliationProcessing(rows);
 +            // Otherwise, compute how many rows per range we got on average 
and pick a concurrency factor
 +            // that should allow us to fetch all remaining rows with the next 
batch of (concurrent) queries.
 +            int remainingRows = command.limits().count() - liveReturned;
 +            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
 +            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - 
rangesQueried, Math.round(remainingRows / rowsPerRange)));
 +            logger.trace("Didn't get enough response rows; actual rows per 
range: {}; remaining rows: {}, new concurrent requests: {}",
 +                         rowsPerRange, (int) remainingRows, 
concurrencyFactor);
 +        }
 +
 +        private SingleRangeResponse query(RangeForQuery toQuery)
 +        {
 +            PartitionRangeReadCommand rangeCommand = 
command.forSubRange(toQuery.range);
 +
 +            DataResolver resolver = new DataResolver(keyspace, rangeCommand, 
consistency, toQuery.filteredEndpoints.size());
 +
 +            int blockFor = consistency.blockFor(keyspace);
 +            int minResponses = Math.min(toQuery.filteredEndpoints.size(), 
blockFor);
 +            List<InetAddress> minimalEndpoints = 
toQuery.filteredEndpoints.subList(0, minResponses);
 +            ReadCallback handler = new ReadCallback(resolver, consistency, 
rangeCommand, minimalEndpoints);
  
 -                // we didn't get enough rows in our concurrent fetch; 
recalculate our concurrency factor
 -                // based on the results we've seen so far (as long as we 
still have ranges left to query)
 -                if (i < ranges.size())
 +            handler.assureSufficientLiveNodes();
 +
 +            if (toQuery.filteredEndpoints.size() == 1 && 
canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
 +            {
-                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(rangeCommand, handler), Tracing.instance.get());
++                StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(rangeCommand, handler));
 +            }
 +            else
 +            {
 +                for (InetAddress endpoint : toQuery.filteredEndpoints)
                  {
 -                    float fetchedRows = countLiveRows ? liveRowCount : 
rows.size();
 -                    float remainingRows = rowsToBeFetched - fetchedRows;
 -                    float actualRowsPerRange;
 -                    if (fetchedRows == 0.0)
 -                    {
 -                        // we haven't actually gotten any results, so query 
all remaining ranges at once
 -                        actualRowsPerRange = 0.0f;
 -                        concurrencyFactor = ranges.size() - i;
 -                    }
 -                    else
 -                    {
 -                        actualRowsPerRange = fetchedRows / i;
 -                        concurrencyFactor = Math.max(1, 
Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
 -                    }
 -                    logger.trace("Didn't get enough response rows; actual 
rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 -                                 actualRowsPerRange, (int) remainingRows, 
concurrencyFactor);
 +                    MessageOut<ReadCommand> message = 
rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
 +                    Tracing.trace("Enqueuing request to {}", endpoint);
 +                    MessagingService.instance().sendRRWithFailure(message, 
endpoint, handler);
                  }
              }
 +
 +            return new SingleRangeResponse(handler);
          }
 -        finally
 +
 +        private PartitionIterator sendNextRequests()
          {
 -            long latency = System.nanoTime() - startTime;
 -            rangeMetrics.addNano(latency);
 -            
Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency,
 TimeUnit.NANOSECONDS);
 +            List<PartitionIterator> concurrentQueries = new 
ArrayList<>(concurrencyFactor);
 +            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
 +            {
 +                concurrentQueries.add(query(ranges.next()));
 +                ++rangesQueried;
 +            }
 +
 +            Tracing.trace("Submitted {} concurrent range requests", 
concurrentQueries.size());
 +            // We want to count the results for the sake of updating the 
concurrency factor (see updateConcurrencyFactor) but we don't want to
 +            // enforce any particular limit at this point (this could break 
code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
 +            counter = DataLimits.NONE.newCounter(command.nowInSec(), true);
 +            return 
counter.applyTo(PartitionIterators.concat(concurrentQueries));
          }
 -        return command.postReconciliationProcessing(rows);
 +
 +        public void close()
 +        {
 +            try
 +            {
 +                if (sentQueryIterator != null)
 +                    sentQueryIterator.close();
 +            }
 +            finally
 +            {
 +                long latency = System.nanoTime() - startTime;
 +                rangeMetrics.addNano(latency);
 +                
Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency,
 TimeUnit.NANOSECONDS);
 +            }
 +        }
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand 
command, ConsistencyLevel consistencyLevel)
 +    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    {
 +        Tracing.trace("Computing ranges to query");
 +
 +        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
 +        RangeIterator ranges = new RangeIterator(command, keyspace, 
consistencyLevel);
 +
 +        // our estimate of how many result rows there will be per-range
 +        float resultsPerRange = estimateResultsPerRange(command, keyspace);
 +        // underestimate how many rows we will get per-range in order to 
increase the likelihood that we'll
 +        // fetch enough rows in the first round
 +        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 +        int concurrencyFactor = resultsPerRange == 0.0
 +                              ? 1
 +                              : Math.max(1, Math.min(ranges.rangeCount(), 
(int) Math.ceil(command.limits().count() / resultsPerRange)));
 +        logger.trace("Estimated result rows per range: {}; requested rows: 
{}, ranges.size(): {}; concurrent range requests: {}",
 +                     resultsPerRange, command.limits().count(), 
ranges.rangeCount(), concurrencyFactor);
 +        Tracing.trace("Submitting range requests on {} ranges with a 
concurrency of {} ({} rows per range expected)", ranges.rangeCount(), 
concurrencyFactor, resultsPerRange);
 +
 +        // Note that in general, a RangeCommandIterator will honor the 
command limit for each range, but will not enforce it globally.
 +
 +        return 
command.limits().filter(command.postReconciliationProcessing(new 
RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, 
consistencyLevel)), command.nowInSec());
      }
  
      public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 704a6c9,bcfe871..6d54e36
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@@ -2263,226 -2686,4 +2263,226 @@@ public class UFTest extends CQLTeste
                             "AS 'return 0;'");
          }
      }
 +
 +    @Test
 +    public void testSecurityPermissions() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (key int primary key, dval double)");
 +        execute("INSERT INTO %s (key, dval) VALUES (?, ?)", 1, 1d);
 +
 +        // Java UDFs
 +
 +        try
 +        {
 +            String fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                          "CREATE OR REPLACE FUNCTION %s(val 
double) " +
 +                                          "RETURNS NULL ON NULL INPUT " +
 +                                          "RETURNS double " +
 +                                          "LANGUAGE JAVA\n" +
 +                                          "AS 
'System.getProperty(\"foo.bar.baz\"); return 0d;';");
 +            execute("SELECT " + fName + "(dval) FROM %s WHERE key=1");
 +            Assert.fail();
 +        }
 +        catch (FunctionExecutionException e)
 +        {
 +            
assertAccessControlException("System.getProperty(\"foo.bar.baz\"); return 0d;", 
e);
 +        }
 +
 +        String[][] typesAndSources =
 +        {
 +        {"",                        "try { Class.forName(\"" + 
UDHelper.class.getName() + "\"); } catch (Exception e) { throw new 
RuntimeException(e); } return 0d;"},
 +        {"sun.misc.Unsafe",         "sun.misc.Unsafe.getUnsafe(); return 
0d;"},
 +        {"",                        "try { 
Class.forName(\"sun.misc.Unsafe\"); } catch (Exception e) { throw new 
RuntimeException(e); } return 0d;"},
 +        {"java.nio.file.FileSystems", "try {" +
 +                                    "     
java.nio.file.FileSystems.getDefault(); return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.nio.channels.FileChannel", "try {" +
 +                                    "     
java.nio.channels.FileChannel.open(java.nio.file.FileSystems.getDefault().getPath(\"/etc/passwd\")).close();
 return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.nio.channels.SocketChannel", "try {" +
 +                                    "     
java.nio.channels.SocketChannel.open().close(); return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.io.FileInputStream", "try {" +
 +                                    "     new 
java.io.FileInputStream(\"./foobar\").close(); return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.lang.Runtime",       "try {" +
 +                                    "     java.lang.Runtime.getRuntime(); 
return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"org.apache.cassandra.service.StorageService",
 +                                    "try {" +
 +                                    "     
org.apache.cassandra.service.StorageService v = 
org.apache.cassandra.service.StorageService.instance; v.isInShutdownHook(); 
return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.net.ServerSocket",   "try {" +
 +                                    "     new java.net.ServerSocket().bind(); 
return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.io.FileOutputStream","try {" +
 +                                    "     new 
java.io.FileOutputStream(\".foo\"); return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'},
 +        {"java.lang.Runtime",       "try {" +
 +                                    "     
java.lang.Runtime.getRuntime().exec(\"/tmp/foo\"); return 0d;" +
 +                                    "} catch (Exception t) {" +
 +                                    "     throw new RuntimeException(t);" +
 +                                    '}'}
 +        };
 +
 +        for (String[] typeAndSource : typesAndSources)
 +        {
 +            assertInvalidMessage(typeAndSource[0] + " cannot be resolved",
 +                                 "CREATE OR REPLACE FUNCTION " + KEYSPACE + 
".invalid_class_access(val double) " +
 +                                 "RETURNS NULL ON NULL INPUT " +
 +                                 "RETURNS double " +
 +                                 "LANGUAGE JAVA\n" +
 +                                 "AS '" + typeAndSource[1] + "';");
 +        }
 +
 +        // JavaScript UDFs
 +
 +        try
 +        {
 +            String fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                          "CREATE OR REPLACE FUNCTION %s(val 
double) " +
 +                                          "RETURNS NULL ON NULL INPUT " +
 +                                          "RETURNS double " +
 +                                          "LANGUAGE javascript\n" +
 +                                          "AS 
'org.apache.cassandra.service.StorageService.instance.isInShutdownHook(); 
0;';");
 +            execute("SELECT " + fName + "(dval) FROM %s WHERE key=1");
 +            Assert.fail("Javascript security check failed");
 +        }
 +        catch (FunctionExecutionException e)
 +        {
 +            assertAccessControlException("", e);
 +        }
 +
 +        String[] javascript =
 +        {
 +        "java.lang.management.ManagmentFactory.getThreadMXBean(); 0;",
 +        "new java.io.FileInputStream(\"/tmp/foo\"); 0;",
 +        "new java.io.FileOutputStream(\"/tmp/foo\"); 0;",
 +        
"java.nio.file.FileSystems.getDefault().createFileExclusively(\"./foo_bar_baz\");
 0;",
 +        
"java.nio.channels.FileChannel.open(java.nio.file.FileSystems.getDefault().getPath(\"/etc/passwd\"));
 0;",
 +        "java.nio.channels.SocketChannel.open(); 0;",
 +        "new java.net.ServerSocket().bind(null); 0;",
 +        "var thread = new java.lang.Thread(); thread.start(); 0;",
 +        "java.lang.System.getProperty(\"foo.bar.baz\"); 0;",
 +        "java.lang.Class.forName(\"java.lang.System\"); 0;",
 +        "java.lang.Runtime.getRuntime().exec(\"/tmp/foo\"); 0;",
 +        "java.lang.Runtime.getRuntime().loadLibrary(\"foobar\"); 0;",
 +        "java.lang.Runtime.getRuntime().loadLibrary(\"foobar\"); 0;",
 +        // TODO these (ugly) calls are still possible - these can consume CPU 
(as one could do with an evil loop, too)
 +//        "java.lang.Runtime.getRuntime().traceMethodCalls(true); 0;",
 +//        "java.lang.Runtime.getRuntime().gc(); 0;",
 +//        "java.lang.Runtime.getRuntime(); 0;",
 +        };
 +
 +        for (String script : javascript)
 +        {
 +            try
 +            {
 +                String fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                              "CREATE OR REPLACE FUNCTION 
%s(val double) " +
 +                                              "RETURNS NULL ON NULL INPUT " +
 +                                              "RETURNS double " +
 +                                              "LANGUAGE javascript\n" +
 +                                              "AS '" + script + "';");
 +                execute("SELECT " + fName + "(dval) FROM %s WHERE key=1");
 +                Assert.fail("Javascript security check failed: " + script);
 +            }
 +            catch (FunctionExecutionException e)
 +            {
 +                assertAccessControlException(script, e);
 +            }
 +        }
 +    }
 +
 +    private static void assertAccessControlException(String script, 
FunctionExecutionException e)
 +    {
 +        for (Throwable t = e; t != null && t != t.getCause(); t = 
t.getCause())
 +            if (t instanceof AccessControlException)
 +                return;
 +        Assert.fail("no AccessControlException for " + script + " (got " + e 
+ ')');
 +    }
 +
 +    @Test
 +    public void testAmokUDF() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (key int primary key, dval double)");
 +        execute("INSERT INTO %s (key, dval) VALUES (?, ?)", 1, 1d);
 +
 +        long udfWarnTimeout = 
DatabaseDescriptor.getUserDefinedFunctionWarnTimeout();
 +        long udfFailTimeout = 
DatabaseDescriptor.getUserDefinedFunctionFailTimeout();
 +        int maxTries = 5;
 +        for (int i = 1; i <= maxTries; i++)
 +        {
 +            try
 +            {
 +                // short timeout
 +                DatabaseDescriptor.setUserDefinedFunctionWarnTimeout(10);
 +                DatabaseDescriptor.setUserDefinedFunctionFailTimeout(250);
 +                // don't kill the unit test... - default policy is "die"
 +                
DatabaseDescriptor.setUserFunctionTimeoutPolicy(Config.UserFunctionTimeoutPolicy.ignore);
 +
-                 ClientWarn.captureWarnings();
++                ClientWarn.instance.captureWarnings();
 +                String fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                              "CREATE OR REPLACE FUNCTION 
%s(val double) " +
 +                                              "RETURNS NULL ON NULL INPUT " +
 +                                              "RETURNS double " +
 +                                              "LANGUAGE JAVA\n" +
 +                                              "AS 'long 
t=System.currentTimeMillis()+110; while (t>System.currentTimeMillis()) { }; 
return 0d;'");
 +                execute("SELECT " + fName + "(dval) FROM %s WHERE key=1");
-                 List<String> warnings = ClientWarn.getWarnings();
++                List<String> warnings = ClientWarn.instance.getWarnings();
 +                Assert.assertNotNull(warnings);
 +                Assert.assertFalse(warnings.isEmpty());
-                 ClientWarn.resetWarnings();
++                ClientWarn.instance.resetWarnings();
 +
 +                // Java UDF
 +
 +                fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                       "CREATE OR REPLACE FUNCTION %s(val 
double) " +
 +                                       "RETURNS NULL ON NULL INPUT " +
 +                                       "RETURNS double " +
 +                                       "LANGUAGE JAVA\n" +
 +                                       "AS 'long 
t=System.currentTimeMillis()+500; while (t>System.currentTimeMillis()) { }; 
return 0d;';");
 +                assertInvalidMessage("ran longer than 250ms", "SELECT " + 
fName + "(dval) FROM %s WHERE key=1");
 +
 +                // Javascript UDF
 +
 +                fName = createFunction(KEYSPACE_PER_TEST, "double",
 +                                       "CREATE OR REPLACE FUNCTION %s(val 
double) " +
 +                                       "RETURNS NULL ON NULL INPUT " +
 +                                       "RETURNS double " +
 +                                       "LANGUAGE JAVASCRIPT\n" +
 +                                       "AS 'var 
t=java.lang.System.currentTimeMillis()+500; while 
(t>java.lang.System.currentTimeMillis()) { }; 0;';");
 +                assertInvalidMessage("ran longer than 250ms", "SELECT " + 
fName + "(dval) FROM %s WHERE key=1");
 +
 +                return;
 +            }
 +            catch (Error | RuntimeException e)
 +            {
 +                if (i == maxTries)
 +                    throw e;
 +            }
 +            finally
 +            {
 +                // reset to defaults
 +                
DatabaseDescriptor.setUserDefinedFunctionWarnTimeout(udfWarnTimeout);
 +                
DatabaseDescriptor.setUserDefinedFunctionFailTimeout(udfFailTimeout);
 +            }
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index c3f2629,d22a8f6..cf14d55
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@@ -23,8 -22,11 +23,10 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.cql3.QueryOptions;
  import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.cql3.QueryOptions;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.transport.Message;
  import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.transport.SimpleClient;
@@@ -76,6 -80,62 +78,62 @@@ public class ClientWarningsTest extend
          }
      }
  
+     @Test
+     public void testTombstoneWarning() throws Exception
+     {
+         final int iterations = 10000;
+         createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+ 
+         try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
+         {
+             client.connect(false);
+ 
+             for (int i = 0; i < iterations; i++)
+             {
+                 QueryMessage query = new QueryMessage(String.format("INSERT 
INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)",
+                                                                     KEYSPACE,
+                                                                     
currentTable(),
+                                                                     i), 
QueryOptions.DEFAULT);
+                 client.execute(query);
+             }
+             ColumnFamilyStore store = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+             store.forceBlockingFlush();
+ 
+             for (int i = 0; i < iterations; i++)
+             {
+                 QueryMessage query = new QueryMessage(String.format("DELETE v 
FROM %s.%s WHERE pk = 1 AND ck = %s",
+                                                                     KEYSPACE,
+                                                                     
currentTable(),
+                                                                     i), 
QueryOptions.DEFAULT);
+                 client.execute(query);
+             }
+             store.forceBlockingFlush();
+ 
+             {
+                 QueryMessage query = new QueryMessage(String.format("SELECT * 
FROM %s.%s WHERE pk = 1",
+                                                                     KEYSPACE,
+                                                                     
currentTable()), QueryOptions.DEFAULT);
+                 Message.Response resp = client.execute(query);
+                 assertEquals(1, resp.getWarnings().size());
+             }
+         }
+     }
+ 
+     @Test
+     public void testLargeBatchWithProtoV2() throws Exception
+     {
+         createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
+ 
 -        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
++        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_3))
+         {
+             client.connect(false);
+ 
+             QueryMessage query = new 
QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()),
 QueryOptions.DEFAULT);
+             Message.Response resp = client.execute(query);
+             assertNull(resp.getWarnings());
+         }
+     }
+ 
      private String createBatchStatement(int minSize)
      {
          return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) 
VALUES (1, '%s') APPLY BATCH;",

Reply via email to