Tongzhou Zhou created SPARK-21899:
-------------------------------------

             Summary: sortBy trigger a new Job 
                 Key: SPARK-21899
                 URL: https://issues.apache.org/jira/browse/SPARK-21899
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.2.0
            Reporter: Tongzhou Zhou


I'm new to Spark, just tried out a spark sortBy function to create a "proof of 
concept" project but found something weird that sortBy is not an transformation 
function as expect, it trigger a Job and {color:red}later on collect function 
will need to recompute everything from the begining:({color}

The result is as expected but we read the input twice!!

I suspect that sortBy involves an internal action function, any clarification 
for this?


Here's my simple Java main function code: 

    public static void main(String[] args) {        
        SparkConf conf = new SparkConf().setAppName("Simple 
Application").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String logFile = "input1048576.txt";
        JavaRDD<String> rdd1 = sc.textFile(logFile,2);
        JavaRDD<String> rdd2 = 
rdd1.flatMap(record->Arrays.asList(record.split(" ")).iterator());
        JavaRDD<String> rdd3 = rdd2.sortBy(record->Integer.parseInt(record), 
true, 2);  //weird, sortBy involves action
        rdd3.collect(); // take action here
        sc.stop();
    }

{color:red}17/09/02 12:58:06 INFO SparkContext: Starting job: sortBy at 
sparkTest.java:21
17/09/02 12:58:06 INFO DAGScheduler: Got job 0 (sortBy at sparkTest.java:21) 
with 2 output partitions{color}
17/09/02 12:58:06 INFO DAGScheduler: Final stage: ResultStage 0 (sortBy at 
sparkTest.java:21)
17/09/02 12:58:06 INFO DAGScheduler: Parents of final stage: List()
17/09/02 12:58:06 INFO DAGScheduler: Missing parents: List()
17/09/02 12:58:06 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[5] at sortBy at sparkTest.java:21), which has no missing 
parents
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 5.3 KB, free 912.1 MB)
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 2.9 KB, free 912.1 MB)
17/09/02 12:58:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
192.168.1.17:60007 (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:06 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1006
17/09/02 12:58:06 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21) (first 15 
tasks are for partitions Vector(0, 1))
17/09/02 12:58:06 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/09/02 12:58:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
localhost, executor driver, partition 1, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/09/02 12:58:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
{color:red}17/09/02 12:58:06 INFO HadoopRDD: Input split: 
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823
17/09/02 12:58:06 INFO HadoopRDD: Input split: 
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824{color}
17/09/02 12:58:06 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036 
bytes result sent to driver
17/09/02 12:58:06 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) 
in 210 ms on localhost (executor driver) (1/2)
17/09/02 12:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1722 
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 1087 ms on localhost (executor driver) (2/2)
17/09/02 12:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
17/09/02 12:58:07 INFO DAGScheduler: ResultStage 0 (sortBy at 
sparkTest.java:21) finished in 1.163 s
17/09/02 12:58:07 INFO DAGScheduler: Job 0 finished: sortBy at 
sparkTest.java:21, took 1.306695 s
{color:red}17/09/02 12:58:07 INFO SparkContext: Starting job: collect at 
sparkTest.java:23{color}
17/09/02 12:58:07 INFO DAGScheduler: Registering RDD 3 (sortBy at 
sparkTest.java:21)
17/09/02 12:58:07 INFO DAGScheduler: Got job 1 (collect at sparkTest.java:23) 
with 2 output partitions
17/09/02 12:58:07 INFO DAGScheduler: Final stage: ResultStage 2 (collect at 
sparkTest.java:23)
17/09/02 12:58:07 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Submitting ShuffleMapStage 1 
(MapPartitionsRDD[3] at sortBy at sparkTest.java:21), which has no missing 
parents
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 6.1 KB, free 912.1 MB)
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 3.4 KB, free 912.1 MB)
17/09/02 12:58:07 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
192.168.1.17:60007 (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:07 INFO SparkContext: Created broadcast 2 from broadcast at 
DAGScheduler.scala:1006
17/09/02 12:58:07 INFO DAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21) (first 
15 tasks are for partitions Vector(0, 1))
17/09/02 12:58:07 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/09/02 12:58:07 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
localhost, executor driver, partition 1, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
17/09/02 12:58:07 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
{color:red}17/09/02 12:58:07 INFO HadoopRDD: Input split: 
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824
17/09/02 12:58:07 INFO HadoopRDD: Input split: 
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823{color}
17/09/02 12:58:07 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 853 
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) 
in 43 ms on localhost (executor driver) (1/2)
17/09/02 12:58:08 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1025 
bytes result sent to driver
17/09/02 12:58:08 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
in 1570 ms on localhost (executor driver) (2/2)
17/09/02 12:58:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have 
all completed, from pool 
17/09/02 12:58:08 INFO DAGScheduler: ShuffleMapStage 1 (sortBy at 
sparkTest.java:21) finished in 1.571 s
17/09/02 12:58:08 INFO DAGScheduler: looking for newly runnable stages
17/09/02 12:58:08 INFO DAGScheduler: running: Set()
17/09/02 12:58:08 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/09/02 12:58:08 INFO DAGScheduler: failed: Set()
17/09/02 12:58:08 INFO DAGScheduler: Submitting ResultStage 2 
(MapPartitionsRDD[7] at sortBy at sparkTest.java:21), which has no missing 
parents
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 3.9 KB, free 912.0 MB)
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 2.2 KB, free 912.0 MB)
17/09/02 12:58:08 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
192.168.1.17:60007 (size: 2.2 KB, free: 912.3 MB)
17/09/02 12:58:08 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:1006
17/09/02 12:58:08 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21) (first 15 
tasks are for partitions Vector(0, 1))
17/09/02 12:58:08 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
17/09/02 12:58:08 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, 
localhost, executor driver, partition 0, ANY, 4621 bytes)
17/09/02 12:58:08 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, 
localhost, executor driver, partition 1, ANY, 4621 bytes)
17/09/02 12:58:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
17/09/02 12:58:08 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks 
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks 
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 
5 ms
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 
5 ms
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 
192.168.1.17:60007 in memory (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
192.168.1.17:60007 in memory (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:11 INFO MemoryStore: Block taskresult_5 stored as bytes in 
memory (estimated size 5.7 MB, free 906.3 MB)
17/09/02 12:58:11 INFO BlockManagerInfo: Added taskresult_5 in memory on 
192.168.1.17:60007 (size: 5.7 MB, free: 906.5 MB)
17/09/02 12:58:11 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5). 
6011268 bytes result sent via BlockManager)
17/09/02 12:58:11 INFO TransportClientFactory: Successfully created connection 
to /192.168.1.17:60007 after 31 ms (0 ms spent in bootstraps)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) 
in 3512 ms on localhost (executor driver) (1/2)
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_5 on 
192.168.1.17:60007 in memory (size: 5.7 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO MemoryStore: Block taskresult_4 stored as bytes in 
memory (estimated size 6.8 MB, free 905.3 MB)
17/09/02 12:58:12 INFO BlockManagerInfo: Added taskresult_4 in memory on 
192.168.1.17:60007 (size: 6.8 MB, free: 905.5 MB)
17/09/02 12:58:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4). 
7143804 bytes result sent via BlockManager)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) 
in 3661 ms on localhost (executor driver) (2/2)
17/09/02 12:58:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have 
all completed, from pool 
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_4 on 
192.168.1.17:60007 in memory (size: 6.8 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO DAGScheduler: ResultStage 2 (collect at 
sparkTest.java:23) finished in 3.662 s
17/09/02 12:58:12 INFO DAGScheduler: Job 1 finished: collect at 
sparkTest.java:23, took 5.278293 s





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to