[
https://issues.apache.org/jira/browse/SPARK-21899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tongzhou Zhou updated SPARK-21899:
----------------------------------
Target Version/s: 2.2.0, 2.1.0 (was: 2.1.0, 2.2.0)
Description:
I'm new to Spark, just tried out a spark sortBy function to create a "proof of
concept" project but found something weird that sortBy is not an transformation
function as expect, it trigger a Job and {color:red}later on collect function
will need to recompute everything from the begining:({color}
The result is as expected but we read the input twice!!
I suspect that sortBy involves an internal action function, any clarification
for this?
Here's my simple Java main function code:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
String logFile = "input1048576.txt";
JavaRDD<String> rdd1 = sc.textFile(logFile,2);
JavaRDD<String> rdd2 =
rdd1.flatMap(record->Arrays.asList(record.split(" ")).iterator());
JavaRDD<String> rdd3 = rdd2.sortBy(record->Integer.parseInt(record),
true, 2); //weird, sortBy involves action
rdd3.collect(); // take action here
sc.stop();
}
{color:red}17/09/02 12:58:06 INFO SparkContext: Starting job: sortBy at
sparkTest.java:21
17/09/02 12:58:06 INFO DAGScheduler: Got job 0 (sortBy at sparkTest.java:21)
with 2 output partitions{color}
17/09/02 12:58:06 INFO DAGScheduler: Final stage: ResultStage 0 (sortBy at
sparkTest.java:21)
17/09/02 12:58:06 INFO DAGScheduler: Parents of final stage: List()
17/09/02 12:58:06 INFO DAGScheduler: Missing parents: List()
17/09/02 12:58:06 INFO DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[5] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 5.3 KB, free 912.1 MB)
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 2.9 KB, free 912.1 MB)
17/09/02 12:58:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
192.168.1.17:60007 (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:06 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:06 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21) (first 15
tasks are for partitions Vector(0, 1))
17/09/02 12:58:06 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/09/02 12:58:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
localhost, executor driver, partition 0, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1,
localhost, executor driver, partition 1, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/09/02 12:58:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
{color:red}17/09/02 12:58:06 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823
17/09/02 12:58:06 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824{color}
17/09/02 12:58:06 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036
bytes result sent to driver
17/09/02 12:58:06 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1)
in 210 ms on localhost (executor driver) (1/2)
17/09/02 12:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1722
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
in 1087 ms on localhost (executor driver) (2/2)
17/09/02 12:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
17/09/02 12:58:07 INFO DAGScheduler: ResultStage 0 (sortBy at
sparkTest.java:21) finished in 1.163 s
17/09/02 12:58:07 INFO DAGScheduler: Job 0 finished: sortBy at
sparkTest.java:21, took 1.306695 s
{color:red}17/09/02 12:58:07 INFO SparkContext: Starting job: collect at
sparkTest.java:23{color}
17/09/02 12:58:07 INFO DAGScheduler: Registering RDD 3 (sortBy at
sparkTest.java:21)
17/09/02 12:58:07 INFO DAGScheduler: Got job 1 (collect at sparkTest.java:23)
with 2 output partitions
17/09/02 12:58:07 INFO DAGScheduler: Final stage: ResultStage 2 (collect at
sparkTest.java:23)
17/09/02 12:58:07 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Submitting ShuffleMapStage 1
(MapPartitionsRDD[3] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 6.1 KB, free 912.1 MB)
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in
memory (estimated size 3.4 KB, free 912.1 MB)
17/09/02 12:58:07 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
192.168.1.17:60007 (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:07 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:07 INFO DAGScheduler: Submitting 2 missing tasks from
ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21) (first
15 tasks are for partitions Vector(0, 1))
17/09/02 12:58:07 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/09/02 12:58:07 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2,
localhost, executor driver, partition 0, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3,
localhost, executor driver, partition 1, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
17/09/02 12:58:07 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
{color:red}17/09/02 12:58:07 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824
17/09/02 12:58:07 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823{color}
17/09/02 12:58:07 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 853
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3)
in 43 ms on localhost (executor driver) (1/2)
17/09/02 12:58:08 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1025
bytes result sent to driver
17/09/02 12:58:08 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2)
in 1570 ms on localhost (executor driver) (2/2)
17/09/02 12:58:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have
all completed, from pool
17/09/02 12:58:08 INFO DAGScheduler: ShuffleMapStage 1 (sortBy at
sparkTest.java:21) finished in 1.571 s
17/09/02 12:58:08 INFO DAGScheduler: looking for newly runnable stages
17/09/02 12:58:08 INFO DAGScheduler: running: Set()
17/09/02 12:58:08 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/09/02 12:58:08 INFO DAGScheduler: failed: Set()
17/09/02 12:58:08 INFO DAGScheduler: Submitting ResultStage 2
(MapPartitionsRDD[7] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 3.9 KB, free 912.0 MB)
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in
memory (estimated size 2.2 KB, free 912.0 MB)
17/09/02 12:58:08 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
192.168.1.17:60007 (size: 2.2 KB, free: 912.3 MB)
17/09/02 12:58:08 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:08 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21) (first 15
tasks are for partitions Vector(0, 1))
17/09/02 12:58:08 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
17/09/02 12:58:08 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4,
localhost, executor driver, partition 0, ANY, 4621 bytes)
17/09/02 12:58:08 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5,
localhost, executor driver, partition 1, ANY, 4621 bytes)
17/09/02 12:58:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
17/09/02 12:58:08 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
5 ms
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
5 ms
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
192.168.1.17:60007 in memory (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
192.168.1.17:60007 in memory (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:11 INFO MemoryStore: Block taskresult_5 stored as bytes in
memory (estimated size 5.7 MB, free 906.3 MB)
17/09/02 12:58:11 INFO BlockManagerInfo: Added taskresult_5 in memory on
192.168.1.17:60007 (size: 5.7 MB, free: 906.5 MB)
17/09/02 12:58:11 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5).
6011268 bytes result sent via BlockManager)
17/09/02 12:58:11 INFO TransportClientFactory: Successfully created connection
to /192.168.1.17:60007 after 31 ms (0 ms spent in bootstraps)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5)
in 3512 ms on localhost (executor driver) (1/2)
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_5 on
192.168.1.17:60007 in memory (size: 5.7 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO MemoryStore: Block taskresult_4 stored as bytes in
memory (estimated size 6.8 MB, free 905.3 MB)
17/09/02 12:58:12 INFO BlockManagerInfo: Added taskresult_4 in memory on
192.168.1.17:60007 (size: 6.8 MB, free: 905.5 MB)
17/09/02 12:58:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4).
7143804 bytes result sent via BlockManager)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4)
in 3661 ms on localhost (executor driver) (2/2)
17/09/02 12:58:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_4 on
192.168.1.17:60007 in memory (size: 6.8 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO DAGScheduler: ResultStage 2 (collect at
sparkTest.java:23) finished in 3.662 s
17/09/02 12:58:12 INFO DAGScheduler: Job 1 finished: collect at
sparkTest.java:23, took 5.278293 s
was:
I'm new to Spark, just tried out a spark sortBy function to create a "proof of
concept" project but found something weird that sortBy is not an transformation
function as expect, it trigger a Job and {color:red}later on collect function
will need to recompute everything from the begining:({color}
The result is as expected but we read the input twice!!
I suspect that sortBy involves an internal action function, any clarification
for this?
Here's my simple Java main function code:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
String logFile = "input1048576.txt";
JavaRDD<String> rdd1 = sc.textFile(logFile,2);
JavaRDD<String> rdd2 =
rdd1.flatMap(record->Arrays.asList(record.split(" ")).iterator());
JavaRDD<String> rdd3 = rdd2.sortBy(record->Integer.parseInt(record),
true, 2); //weird, sortBy involves action
rdd3.collect(); // take action here
sc.stop();
}
{color:red}17/09/02 12:58:06 INFO SparkContext: Starting job: sortBy at
sparkTest.java:21
17/09/02 12:58:06 INFO DAGScheduler: Got job 0 (sortBy at sparkTest.java:21)
with 2 output partitions{color}
17/09/02 12:58:06 INFO DAGScheduler: Final stage: ResultStage 0 (sortBy at
sparkTest.java:21)
17/09/02 12:58:06 INFO DAGScheduler: Parents of final stage: List()
17/09/02 12:58:06 INFO DAGScheduler: Missing parents: List()
17/09/02 12:58:06 INFO DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[5] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 5.3 KB, free 912.1 MB)
17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 2.9 KB, free 912.1 MB)
17/09/02 12:58:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
192.168.1.17:60007 (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:06 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:06 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21) (first 15
tasks are for partitions Vector(0, 1))
17/09/02 12:58:06 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/09/02 12:58:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
localhost, executor driver, partition 0, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1,
localhost, executor driver, partition 1, PROCESS_LOCAL, 4883 bytes)
17/09/02 12:58:06 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/09/02 12:58:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
{color:red}17/09/02 12:58:06 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823
17/09/02 12:58:06 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824{color}
17/09/02 12:58:06 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036
bytes result sent to driver
17/09/02 12:58:06 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1)
in 210 ms on localhost (executor driver) (1/2)
17/09/02 12:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1722
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
in 1087 ms on localhost (executor driver) (2/2)
17/09/02 12:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
17/09/02 12:58:07 INFO DAGScheduler: ResultStage 0 (sortBy at
sparkTest.java:21) finished in 1.163 s
17/09/02 12:58:07 INFO DAGScheduler: Job 0 finished: sortBy at
sparkTest.java:21, took 1.306695 s
{color:red}17/09/02 12:58:07 INFO SparkContext: Starting job: collect at
sparkTest.java:23{color}
17/09/02 12:58:07 INFO DAGScheduler: Registering RDD 3 (sortBy at
sparkTest.java:21)
17/09/02 12:58:07 INFO DAGScheduler: Got job 1 (collect at sparkTest.java:23)
with 2 output partitions
17/09/02 12:58:07 INFO DAGScheduler: Final stage: ResultStage 2 (collect at
sparkTest.java:23)
17/09/02 12:58:07 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
17/09/02 12:58:07 INFO DAGScheduler: Submitting ShuffleMapStage 1
(MapPartitionsRDD[3] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 6.1 KB, free 912.1 MB)
17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in
memory (estimated size 3.4 KB, free 912.1 MB)
17/09/02 12:58:07 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
192.168.1.17:60007 (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:07 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:07 INFO DAGScheduler: Submitting 2 missing tasks from
ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21) (first
15 tasks are for partitions Vector(0, 1))
17/09/02 12:58:07 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/09/02 12:58:07 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2,
localhost, executor driver, partition 0, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3,
localhost, executor driver, partition 1, PROCESS_LOCAL, 4872 bytes)
17/09/02 12:58:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
17/09/02 12:58:07 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
{color:red}17/09/02 12:58:07 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824
17/09/02 12:58:07 INFO HadoopRDD: Input split:
file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823{color}
17/09/02 12:58:07 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 853
bytes result sent to driver
17/09/02 12:58:07 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3)
in 43 ms on localhost (executor driver) (1/2)
17/09/02 12:58:08 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1025
bytes result sent to driver
17/09/02 12:58:08 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2)
in 1570 ms on localhost (executor driver) (2/2)
17/09/02 12:58:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have
all completed, from pool
17/09/02 12:58:08 INFO DAGScheduler: ShuffleMapStage 1 (sortBy at
sparkTest.java:21) finished in 1.571 s
17/09/02 12:58:08 INFO DAGScheduler: looking for newly runnable stages
17/09/02 12:58:08 INFO DAGScheduler: running: Set()
17/09/02 12:58:08 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/09/02 12:58:08 INFO DAGScheduler: failed: Set()
17/09/02 12:58:08 INFO DAGScheduler: Submitting ResultStage 2
(MapPartitionsRDD[7] at sortBy at sparkTest.java:21), which has no missing
parents
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 3.9 KB, free 912.0 MB)
17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in
memory (estimated size 2.2 KB, free 912.0 MB)
17/09/02 12:58:08 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
192.168.1.17:60007 (size: 2.2 KB, free: 912.3 MB)
17/09/02 12:58:08 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:1006
17/09/02 12:58:08 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21) (first 15
tasks are for partitions Vector(0, 1))
17/09/02 12:58:08 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
17/09/02 12:58:08 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4,
localhost, executor driver, partition 0, ANY, 4621 bytes)
17/09/02 12:58:08 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5,
localhost, executor driver, partition 1, ANY, 4621 bytes)
17/09/02 12:58:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
17/09/02 12:58:08 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks
out of 2 blocks
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
5 ms
17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
5 ms
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
192.168.1.17:60007 in memory (size: 3.4 KB, free: 912.3 MB)
17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
192.168.1.17:60007 in memory (size: 2.9 KB, free: 912.3 MB)
17/09/02 12:58:11 INFO MemoryStore: Block taskresult_5 stored as bytes in
memory (estimated size 5.7 MB, free 906.3 MB)
17/09/02 12:58:11 INFO BlockManagerInfo: Added taskresult_5 in memory on
192.168.1.17:60007 (size: 5.7 MB, free: 906.5 MB)
17/09/02 12:58:11 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5).
6011268 bytes result sent via BlockManager)
17/09/02 12:58:11 INFO TransportClientFactory: Successfully created connection
to /192.168.1.17:60007 after 31 ms (0 ms spent in bootstraps)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5)
in 3512 ms on localhost (executor driver) (1/2)
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_5 on
192.168.1.17:60007 in memory (size: 5.7 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO MemoryStore: Block taskresult_4 stored as bytes in
memory (estimated size 6.8 MB, free 905.3 MB)
17/09/02 12:58:12 INFO BlockManagerInfo: Added taskresult_4 in memory on
192.168.1.17:60007 (size: 6.8 MB, free: 905.5 MB)
17/09/02 12:58:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4).
7143804 bytes result sent via BlockManager)
17/09/02 12:58:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4)
in 3661 ms on localhost (executor driver) (2/2)
17/09/02 12:58:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_4 on
192.168.1.17:60007 in memory (size: 6.8 MB, free: 912.3 MB)
17/09/02 12:58:12 INFO DAGScheduler: ResultStage 2 (collect at
sparkTest.java:23) finished in 3.662 s
17/09/02 12:58:12 INFO DAGScheduler: Job 1 finished: collect at
sparkTest.java:23, took 5.278293 s
> sortBy triggesr a new Job
> --------------------------
>
> Key: SPARK-21899
> URL: https://issues.apache.org/jira/browse/SPARK-21899
> Project: Spark
> Issue Type: Bug
> Components: Java API
> Affects Versions: 2.2.0
> Reporter: Tongzhou Zhou
>
> I'm new to Spark, just tried out a spark sortBy function to create a "proof
> of concept" project but found something weird that sortBy is not an
> transformation function as expect, it trigger a Job and {color:red}later on
> collect function will need to recompute everything from the begining:({color}
> The result is as expected but we read the input twice!!
> I suspect that sortBy involves an internal action function, any clarification
> for this?
> Here's my simple Java main function code:
> public static void main(String[] args) {
> SparkConf conf = new SparkConf().setAppName("Simple
> Application").setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf);
> String logFile = "input1048576.txt";
> JavaRDD<String> rdd1 = sc.textFile(logFile,2);
> JavaRDD<String> rdd2 =
> rdd1.flatMap(record->Arrays.asList(record.split(" ")).iterator());
> JavaRDD<String> rdd3 = rdd2.sortBy(record->Integer.parseInt(record),
> true, 2); //weird, sortBy involves action
> rdd3.collect(); // take action here
> sc.stop();
> }
> {color:red}17/09/02 12:58:06 INFO SparkContext: Starting job: sortBy at
> sparkTest.java:21
> 17/09/02 12:58:06 INFO DAGScheduler: Got job 0 (sortBy at sparkTest.java:21)
> with 2 output partitions{color}
> 17/09/02 12:58:06 INFO DAGScheduler: Final stage: ResultStage 0 (sortBy at
> sparkTest.java:21)
> 17/09/02 12:58:06 INFO DAGScheduler: Parents of final stage: List()
> 17/09/02 12:58:06 INFO DAGScheduler: Missing parents: List()
> 17/09/02 12:58:06 INFO DAGScheduler: Submitting ResultStage 0
> (MapPartitionsRDD[5] at sortBy at sparkTest.java:21), which has no missing
> parents
> 17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 5.3 KB, free 912.1 MB)
> 17/09/02 12:58:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes
> in memory (estimated size 2.9 KB, free 912.1 MB)
> 17/09/02 12:58:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on 192.168.1.17:60007 (size: 2.9 KB, free: 912.3 MB)
> 17/09/02 12:58:06 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:1006
> 17/09/02 12:58:06 INFO DAGScheduler: Submitting 2 missing tasks from
> ResultStage 0 (MapPartitionsRDD[5] at sortBy at sparkTest.java:21) (first 15
> tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:06 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 17/09/02 12:58:06 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0,
> localhost, executor driver, partition 0, PROCESS_LOCAL, 4883 bytes)
> 17/09/02 12:58:06 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1,
> localhost, executor driver, partition 1, PROCESS_LOCAL, 4883 bytes)
> 17/09/02 12:58:06 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/09/02 12:58:06 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> {color:red}17/09/02 12:58:06 INFO HadoopRDD: Input split:
> file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823
> 17/09/02 12:58:06 INFO HadoopRDD: Input split:
> file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824{color}
> 17/09/02 12:58:06 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1036
> bytes result sent to driver
> 17/09/02 12:58:06 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1)
> in 210 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1722
> bytes result sent to driver
> 17/09/02 12:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0)
> in 1087 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 17/09/02 12:58:07 INFO DAGScheduler: ResultStage 0 (sortBy at
> sparkTest.java:21) finished in 1.163 s
> 17/09/02 12:58:07 INFO DAGScheduler: Job 0 finished: sortBy at
> sparkTest.java:21, took 1.306695 s
> {color:red}17/09/02 12:58:07 INFO SparkContext: Starting job: collect at
> sparkTest.java:23{color}
> 17/09/02 12:58:07 INFO DAGScheduler: Registering RDD 3 (sortBy at
> sparkTest.java:21)
> 17/09/02 12:58:07 INFO DAGScheduler: Got job 1 (collect at sparkTest.java:23)
> with 2 output partitions
> 17/09/02 12:58:07 INFO DAGScheduler: Final stage: ResultStage 2 (collect at
> sparkTest.java:23)
> 17/09/02 12:58:07 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 1)
> 17/09/02 12:58:07 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
> 17/09/02 12:58:07 INFO DAGScheduler: Submitting ShuffleMapStage 1
> (MapPartitionsRDD[3] at sortBy at sparkTest.java:21), which has no missing
> parents
> 17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 6.1 KB, free 912.1 MB)
> 17/09/02 12:58:07 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
> in memory (estimated size 3.4 KB, free 912.1 MB)
> 17/09/02 12:58:07 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on 192.168.1.17:60007 (size: 3.4 KB, free: 912.3 MB)
> 17/09/02 12:58:07 INFO SparkContext: Created broadcast 2 from broadcast at
> DAGScheduler.scala:1006
> 17/09/02 12:58:07 INFO DAGScheduler: Submitting 2 missing tasks from
> ShuffleMapStage 1 (MapPartitionsRDD[3] at sortBy at sparkTest.java:21) (first
> 15 tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:07 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
> 17/09/02 12:58:07 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2,
> localhost, executor driver, partition 0, PROCESS_LOCAL, 4872 bytes)
> 17/09/02 12:58:07 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3,
> localhost, executor driver, partition 1, PROCESS_LOCAL, 4872 bytes)
> 17/09/02 12:58:07 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
> 17/09/02 12:58:07 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
> {color:red}17/09/02 12:58:07 INFO HadoopRDD: Input split:
> file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:5495823+5495824
> 17/09/02 12:58:07 INFO HadoopRDD: Input split:
> file:/Users/Joy4fun/Documents/workspace/Joy-app/input1048576.txt:0+5495823{color}
> 17/09/02 12:58:07 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 853
> bytes result sent to driver
> 17/09/02 12:58:07 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3)
> in 43 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:08 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1025
> bytes result sent to driver
> 17/09/02 12:58:08 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2)
> in 1570 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
> have all completed, from pool
> 17/09/02 12:58:08 INFO DAGScheduler: ShuffleMapStage 1 (sortBy at
> sparkTest.java:21) finished in 1.571 s
> 17/09/02 12:58:08 INFO DAGScheduler: looking for newly runnable stages
> 17/09/02 12:58:08 INFO DAGScheduler: running: Set()
> 17/09/02 12:58:08 INFO DAGScheduler: waiting: Set(ResultStage 2)
> 17/09/02 12:58:08 INFO DAGScheduler: failed: Set()
> 17/09/02 12:58:08 INFO DAGScheduler: Submitting ResultStage 2
> (MapPartitionsRDD[7] at sortBy at sparkTest.java:21), which has no missing
> parents
> 17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 3.9 KB, free 912.0 MB)
> 17/09/02 12:58:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
> in memory (estimated size 2.2 KB, free 912.0 MB)
> 17/09/02 12:58:08 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on 192.168.1.17:60007 (size: 2.2 KB, free: 912.3 MB)
> 17/09/02 12:58:08 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:1006
> 17/09/02 12:58:08 INFO DAGScheduler: Submitting 2 missing tasks from
> ResultStage 2 (MapPartitionsRDD[7] at sortBy at sparkTest.java:21) (first 15
> tasks are for partitions Vector(0, 1))
> 17/09/02 12:58:08 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
> 17/09/02 12:58:08 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4,
> localhost, executor driver, partition 0, ANY, 4621 bytes)
> 17/09/02 12:58:08 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5,
> localhost, executor driver, partition 1, ANY, 4621 bytes)
> 17/09/02 12:58:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
> 17/09/02 12:58:08 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 2 blocks
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 2 blocks
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
> in 5 ms
> 17/09/02 12:58:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
> in 5 ms
> 17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
> 192.168.1.17:60007 in memory (size: 3.4 KB, free: 912.3 MB)
> 17/09/02 12:58:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
> 192.168.1.17:60007 in memory (size: 2.9 KB, free: 912.3 MB)
> 17/09/02 12:58:11 INFO MemoryStore: Block taskresult_5 stored as bytes in
> memory (estimated size 5.7 MB, free 906.3 MB)
> 17/09/02 12:58:11 INFO BlockManagerInfo: Added taskresult_5 in memory on
> 192.168.1.17:60007 (size: 5.7 MB, free: 906.5 MB)
> 17/09/02 12:58:11 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5).
> 6011268 bytes result sent via BlockManager)
> 17/09/02 12:58:11 INFO TransportClientFactory: Successfully created
> connection to /192.168.1.17:60007 after 31 ms (0 ms spent in bootstraps)
> 17/09/02 12:58:12 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5)
> in 3512 ms on localhost (executor driver) (1/2)
> 17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_5 on
> 192.168.1.17:60007 in memory (size: 5.7 MB, free: 912.3 MB)
> 17/09/02 12:58:12 INFO MemoryStore: Block taskresult_4 stored as bytes in
> memory (estimated size 6.8 MB, free 905.3 MB)
> 17/09/02 12:58:12 INFO BlockManagerInfo: Added taskresult_4 in memory on
> 192.168.1.17:60007 (size: 6.8 MB, free: 905.5 MB)
> 17/09/02 12:58:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4).
> 7143804 bytes result sent via BlockManager)
> 17/09/02 12:58:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4)
> in 3661 ms on localhost (executor driver) (2/2)
> 17/09/02 12:58:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
> have all completed, from pool
> 17/09/02 12:58:12 INFO BlockManagerInfo: Removed taskresult_4 on
> 192.168.1.17:60007 in memory (size: 6.8 MB, free: 912.3 MB)
> 17/09/02 12:58:12 INFO DAGScheduler: ResultStage 2 (collect at
> sparkTest.java:23) finished in 3.662 s
> 17/09/02 12:58:12 INFO DAGScheduler: Job 1 finished: collect at
> sparkTest.java:23, took 5.278293 s
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]