I am trying out Aggregations in a single node Cassandra 3.1 installation. The node has 4GB RAM. The table being aggregated on contains ~450000 rows. It contains information on US domestic flights for a single month (from http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time).

CREATE AGGREGATE flightdata.late_flights(text, decimal)
    SFUNC state_late_flights
    STYPE map<text, frozen<tuple<int, int>>>
    INITCOND {};

The late_flights aggregation function uses a state_late_flights() User-Defined Function that maintains a map of uniquecarrier to tuple<int,int>. The first int in the tuple represents delayed flights of the corresponding uniquecarrier. The second int represents total flights of the uniquecarrier.

This aggregation query on a subset of the days of the month works:

   cqlsh:flightdata> select late_flights(uniquecarrier, depdel15) from
   flightsbydate *where flightdate in ('2015-09-15', '2015-09-16',
   '2015-09-17', '2015-09-18', '2015-09-19', '2015-09-20', '2015-09-21')*;

     flightdata.late_flights(uniquecarrier, depdel15)
   
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     {'AA': (2395, 17138), 'AS': (234, 3308), 'B6': (703, 4832), 'DL':
   (1452, 17311), 'EV': (1028, 10502), 'F9': (221, 1837), 'HA': (79,
   1414), 'MQ': (892, 4926), 'NK': (535, 2300), 'OO': (1539, 11299),
   'UA': (1422, 9792), 'VX': (181, 1209), 'WN': (3446, 23659)}

   (1 rows)

   Warnings :
   Aggregation query used on multiple partition keys (IN restriction)


However, the aggregation on all ~450000 rows always fails, sometimes immediately, sometimes after 30-60 seconds:

   cqlsh:flightdata> select late_flights(uniquecarrier, depdel15) from
   flightsbydate;

   Traceback (most recent call last):
      File "CassandraInstall-3.1/bin/cqlsh.py", line 1258, in
   perform_simple_statement
        result = future.result()
      File
   
"/home/wpl/CassandraInstall-3.1/bin/../lib/cassandra-driver-internal-only-3.0.0-6af642d.zip/cassandra-driver-3.0.0-6af642d/cassandra/cluster.py",
   line 3122, in result
        raise self._final_exception
   FunctionFailure: code=1400 [User Defined Function failure]
   message="execution of 'flightdata.state_late_flights[map<text,
   frozen<tuple<int, int>>>, text, decimal]' failed:
   java.security.AccessControlException: access denied
   ("java.io.FilePermission"
   "/home/wpl/CassandraInstall-3.1/conf/logback.xml" "read")"


While this query runs, CPU utilization is 100% - 120%, Peak RAM used is less than 3.5GB.

Just in case it is useful, the state_late_flights User-Defined function:

   cqlsh:flightdata> describe function state_late_flights;

   CREATE FUNCTION flightdata.state_late_flights(state map<text,
   frozen<tuple<int, int>>>, flid text, fldelay decimal)
        CALLED ON NULL INPUT
        RETURNS map<text, frozen<tuple<int, int>>>
        LANGUAGE java
        AS $$com.datastax.driver.core.TupleType tt =
   
com.datastax.driver.core.TupleType.of(com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED,
   com.datastax.driver.core.CodecRegistry.DEFAULT_INSTANCE,
   com.datastax.driver.core.DataType.cint(),
   com.datastax.driver.core.DataType.cint());
   com.datastax.driver.core.TupleValue tv = tt.newValue(); tv.setInt(0,
   0); tv.setInt(1, 1); if (flid == null) { state.put("EMPTY", tv);
   return state; } if (state.get(flid) != null) {  tv =
   (com.datastax.driver.core.TupleValue) state.get(flid);  tv.setInt(1,
   tv.getInt(1) + 1); if
   (fldelay.compareTo(java.math.BigDecimal.valueOf(0)) == 1) {
   tv.setInt(0, tv.getInt(0) + 1); } } state.put(flid, tv); return
   state;$$;


What should be checked on to investigate this further?
Thanks,
Dinesh.

Reply via email to