I am not an expert, but it seems all your processing is done on node1 while
node2 is lying idle

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Wed, Jun 15, 2016 at 7:35 PM, Jörn Franke <jornfra...@gmail.com> wrote:

>
> What Volume do you have? Why do not you use the corresponding Cassandra
> functionality directly?
> If you do it once and not iteratively in-memory you cannot expect so much
> improvement
>
> On 15 Jun 2016, at 16:01, nikita.dobryukha <n.dobryu...@gmail.com> wrote:
>
> We use Cassandra 3.5 + Spark 1.6.1 in 2-node cluster (8 cores and 1g
> memory per node). There is the following Cassandra table
>
> CREATE TABLE schema.trade (
> symbol text,
> date int,
> trade_time timestamp,
> reporting_venue text,
> trade_id bigint,
> ref_trade_id bigint,
> action_type text,
> price double,
> quantity int,
> condition_code text,
> PRIMARY KEY ((symbol, date), trade_time, trade_id)
> ) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 
> 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};
>
> And I want to calculate percentage of volume: sum of all volume from
> trades in the relevant security during the time period groupped by exchange
> and time bar (1 or 5 minutes). I've created an example:
>
> void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, 
> Timestamp timeTill, Integer barWidth) {
>     char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
>     LOG.info <http://log.info>("start");
>     JavaPairRDD<Tuple2, Integer> counts = 
> javaFunctions(sparkContext).cassandraTable("schema", "trade")
>             .filter(row ->
>                         row.getString("symbol").equals(symbol) && 
> row.getInt("date").equals(date) &&
>                         row.getDateTime("trade_time").getMillis() >= 
> timeFrom.getTime() &&
>                         row.getDateTime("trade_time").getMillis() < 
> timeTill.getTime())
>             .mapToPair(row ->
>                 new Tuple2<>(
>                     new Tuple2(
>                             new Timestamp(
>                                     
> (row.getDateTime("trade_time").getMillis() / (barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER
>                             ),
>                             row.getString("reporting_venue")),
>                     row.getInt("quantity")
>                 )
>             ).reduceByKey((a, b) -> a + b);
>     LOG.info <http://log.info>(counts.collect().toString());
>     LOG.info <http://log.info>("finish");
> }
>
> ...
> [2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start
> [2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native 
> epoll transport in the classpath, using it
> [2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host 
> /node1:9042 added
> [2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] 
> Added host node1 (datacenter1)
> [2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host 
> /node2:9042 added
> [2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to 
> Cassandra cluster: Cassandra
> [2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect 
> at EquityTCAAnalytics.java:88
> [2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
> [2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Parents of final stage: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Missing parents: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at 
> EquityTCAAnalytics.java:78), which has no missing parents
> [2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free 
> 10.8 KB)
> [2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, 
> free 16.3 KB)
> [2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] 
> [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: 
> 5.4 KB, free: 2.4 GB)
> [2016-06-15 09:25:29.650] [INFO ] [dag-scheduler-event-loop] [SparkContext] 
> Created broadcast 0 from broadcast at DAGScheduler.scala:1006
> [2016-06-15 09:25:29.658] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at 
> mapToPair at EquityTCAAnalytics.java:78)
> [2016-06-15 09:25:29.661] [INFO ] [dag-scheduler-event-loop] 
> [TaskSchedulerImpl] Adding task set 0.0 with 5 tasks
> [2016-06-15 09:25:30.006] [INFO ] [dispatcher-event-loop-7] 
> [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) 
> (node1:41122) with ID 0
> [2016-06-15 09:25:30.040] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] 
> Starting task 0.0 in stage 0.0 (TID 0, node1, partition 0,NODE_LOCAL, 11725 
> bytes)
> [2016-06-15 09:25:30.051] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] 
> Starting task 1.0 in stage 0.0 (TID 1, node1, partition 1,NODE_LOCAL, 11317 
> bytes)
> [2016-06-15 09:25:30.054] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] 
> Starting task 2.0 in stage 0.0 (TID 2, node1, partition 2,NODE_LOCAL, 11929 
> bytes)
> [2016-06-15 09:25:30.057] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] 
> Starting task 3.0 in stage 0.0 (TID 3, node1, partition 3,NODE_LOCAL, 11249 
> bytes)
> [2016-06-15 09:25:30.059] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] 
> Starting task 4.0 in stage 0.0 (TID 4, node1, partition 4,NODE_LOCAL, 11560 
> bytes)
> [2016-06-15 09:25:30.077] [INFO ] [dispatcher-event-loop-7] 
> [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) 
> (CassandraCH4.ehubprod.local:33668) with ID 1
> [2016-06-15 09:25:30.111] [INFO ] [dispatcher-event-loop-4] 
> [BlockManagerMasterEndpoint] Registering block manager node1:36512 with 511.1 
> MB RAM, BlockManagerId(0, node1, 36512)
> [2016-06-15 09:25:30.168] [INFO ] [dispatcher-event-loop-3] 
> [BlockManagerMasterEndpoint] Registering block manager 
> CassandraCH4.ehubprod.local:33610 with 511.1 MB RAM, BlockManagerId(1, 
> CassandraCH4.ehubprod.local, 33610)
> [2016-06-15 09:25:30.818] [INFO ] [dispatcher-event-loop-2] 
> [BlockManagerInfo] Added broadcast_0_piece0 in memory on node1:36512 (size: 
> 5.4 KB, free: 511.1 MB)
> [2016-06-15 09:25:36.764] [INFO ] [pool-21-thread-1] [CassandraConnector] 
> Disconnected from Cassandra cluster: Cassandra
> [2016-06-15 09:25:48.914] [INFO ] [task-result-getter-0] [TaskSetManager] 
> Finished task 4.0 in stage 0.0 (TID 4) in 18854 ms on node1 (1/5)
> [2016-06-15 09:25:55.541] [INFO ] [task-result-getter-1] [TaskSetManager] 
> Finished task 2.0 in stage 0.0 (TID 2) in 25489 ms on node1 (2/5)
> [2016-06-15 09:25:57.837] [INFO ] [task-result-getter-2] [TaskSetManager] 
> Finished task 1.0 in stage 0.0 (TID 1) in 27795 ms on node1 (3/5)
> [2016-06-15 09:25:57.931] [INFO ] [task-result-getter-3] [TaskSetManager] 
> Finished task 0.0 in stage 0.0 (TID 0) in 27919 ms on node1 (4/5)
> [2016-06-15 09:26:01.357] [INFO ] [task-result-getter-0] [TaskSetManager] 
> Finished task 3.0 in stage 0.0 (TID 3) in 31302 ms on node1 (5/5)
> [2016-06-15 09:26:01.358] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> ShuffleMapStage 0 (mapToPair at EquityTCAAnalytics.java:78) finished in 
> 31.602 s
> [2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> looking for newly runnable stages
> [2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> running: Set()
> [2016-06-15 09:26:01.360] [INFO ] [task-result-getter-0] [TaskSchedulerImpl] 
> Removed TaskSet 0.0, whose tasks have all completed, from pool
> [2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> waiting: Set(ResultStage 1)
> [2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> failed: Set()
> [2016-06-15 09:26:01.365] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting ResultStage 1 (ShuffledRDD[3] at reduceByKey at 
> EquityTCAAnalytics.java:87), which has no missing parents
> [2016-06-15 09:26:01.373] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 
> 19.9 KB)
> [2016-06-15 09:26:01.382] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, 
> free 21.9 KB)
> [2016-06-15 09:26:01.383] [INFO ] [dispatcher-event-loop-1] 
> [BlockManagerInfo] Added broadcast_1_piece0 in memory on node2:44871 (size: 
> 2.1 KB, free: 2.4 GB)
> [2016-06-15 09:26:01.384] [INFO ] [dag-scheduler-event-loop] [SparkContext] 
> Created broadcast 1 from broadcast at DAGScheduler.scala:1006
> [2016-06-15 09:26:01.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting 5 missing tasks from ResultStage 1 (ShuffledRDD[3] at reduceByKey 
> at EquityTCAAnalytics.java:87)
> [2016-06-15 09:26:01.386] [INFO ] [dag-scheduler-event-loop] 
> [TaskSchedulerImpl] Adding task set 1.0 with 5 tasks
> [2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] 
> Starting task 0.0 in stage 1.0 (TID 5, node1, partition 0,NODE_LOCAL, 2786 
> bytes)
> [2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] 
> Starting task 1.0 in stage 1.0 (TID 6, node1, partition 1,NODE_LOCAL, 2786 
> bytes)
> [2016-06-15 09:26:01.397] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] 
> Starting task 2.0 in stage 1.0 (TID 7, node1, partition 2,NODE_LOCAL, 2786 
> bytes)
> [2016-06-15 09:26:01.398] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] 
> Starting task 3.0 in stage 1.0 (TID 8, node1, partition 3,NODE_LOCAL, 2786 
> bytes)
> [2016-06-15 09:26:01.406] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] 
> Starting task 4.0 in stage 1.0 (TID 9, node1, partition 4,NODE_LOCAL, 2786 
> bytes)
> [2016-06-15 09:26:01.429] [INFO ] [dispatcher-event-loop-4] 
> [BlockManagerInfo] Added broadcast_1_piece0 in memory on node1:36512 (size: 
> 2.1 KB, free: 511.1 MB)
> [2016-06-15 09:26:01.452] [INFO ] [dispatcher-event-loop-6] 
> [MapOutputTrackerMasterEndpoint] Asked to send map output locations for 
> shuffle 0 to node1:41122
> [2016-06-15 09:26:01.456] [INFO ] [dispatcher-event-loop-6] 
> [MapOutputTrackerMaster] Size of output statuses for shuffle 0 is 161 bytes
> [2016-06-15 09:26:01.526] [INFO ] [task-result-getter-1] [TaskSetManager] 
> Finished task 4.0 in stage 1.0 (TID 9) in 128 ms on node1 (1/5)
> [2016-06-15 09:26:01.575] [INFO ] [task-result-getter-3] [TaskSetManager] 
> Finished task 2.0 in stage 1.0 (TID 7) in 184 ms on node1 (2/5)
> [2016-06-15 09:26:01.580] [INFO ] [task-result-getter-2] [TaskSetManager] 
> Finished task 0.0 in stage 1.0 (TID 5) in 193 ms on node1 (3/5)
> [2016-06-15 09:26:01.589] [INFO ] [task-result-getter-3] [TaskSetManager] 
> Finished task 1.0 in stage 1.0 (TID 6) in 199 ms on node1 (4/5)
> [2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSetManager] 
> Finished task 3.0 in stage 1.0 (TID 8) in 200 ms on node1 (5/5)
> [2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSchedulerImpl] 
> Removed TaskSet 1.0, whose tasks have all completed, from pool
> [2016-06-15 09:26:01.599] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> ResultStage 1 (collect at EquityTCAAnalytics.java:88) finished in 0.202 s
> [2016-06-15 09:26:01.612] [INFO ] [main] [DAGScheduler] Job 0 finished: 
> collect at EquityTCAAnalytics.java:88, took 32.496470 s
> [2016-06-15 09:26:01.634] [INFO ] [main] [EquityTCAAnalytics] [((2016-06-10 
> 13:45:00.0,DA),6944), ((2016-06-10 14:25:00.0,B),5241), ..., ((2016-06-10 
> 10:55:00.0,QD),109080), ((2016-06-10 14:55:00.0,A),1300)]
> [2016-06-15 09:26:01.641] [INFO ] [main] [EquityTCAAnalytics] finish
>
> 32.5 s is normal?
> ------------------------------
> View this message in context: Is that normal spark performance?
> <http://apache-spark-user-list.1001560.n3.nabble.com/Is-that-normal-spark-performance-tp27174.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com
> <http://nabble.com>.
>
>

Reply via email to