Hi, I would like to understand the pipeline of spark's operation(transformation and action) and some details on block storage.
Let's consider the following code: val rdd1 = SparkContext.textFile("hdfs://...") rdd1.map(func1).map(func2).count For example, we have a file in hdfs about 80Gb, already split in 32 files, each 2.5Gb. q1) How many partitions will rdd1 have ? rule 1) Maybe 32, since there are 32 split files ? Because, most of the case, this rule is true if the file is not big in size. rule 2) Maybe more, I am not sure whether spark's block store can contain a 2.5Gb partition. Is there some parameter specify the block store size ? AFAIK, hdfs block size is used to read data from hdfs by spark. So there will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many ? Which criterion will it take ? the number of split files or hdfs block size. q2) Here, func1 and func2 are sequentially added into DAG. What's the workflow on the partition level ? option1: Given a partition, func1 and func2 will be applied to each element in this partition sequentially. After everything is done, we count the # of line in the partition and send count result to drive. Then, we take the next partition and do the same thing? option2: Or else, we apply func1 to all the partitions first, then apply func2 to all partitions which have applied func1, count # of line in each partition and send result to driver ? I have do some tests, it seems that option1 is correct. Can anyone confirm this ? So in option 1, we have 1 job "count" which contains 3 stages: map(func1), map(func2), count. q3) What if we run out of memory ? Suppose we have 12 cores, 15Gb memory in cluster. Case1 : For example, the func1 will take one line in file, and create an big object for each line, then the partition applied func1 will become a large partition. If we have 12 cores in clusters, that means we may have 12 large partitions in memory. What if these partitions are much bigger than memory ? What will happen ? an exception OOM / heap size, etc ? Case2 : Suppose the input is 80 GB, but we force RDD to be repartitioned into 6 partitions which is small than the number of core. Normally, each partition will be send to a core, then all the input will be in memory. However, we have 15G memory in Cluster. What will happen ? OOM Exception ? Then, could we just split the RDD into more partitions so that 80GB / #partition *12(which is # of cores) < 15Gb(memory size) ? Meanwhile, we can not split too many, which leads to some overhead on task distribution. If we read data from hdfs using hdfs block size 64MB as partition size, we will have a formula like: 64Mb * # of cores < Memory which in most case is true. Could this explain why we reading hdfs using block size will not leads to OOM like case 2, even if the data is very big in size. Sorry for making this post a bit long. Hope I make myself clear. Any help on any question will be appreciated. Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org