Re: How does partitioning happen for binary files in spark ?
The code that you see in github is for version 2.1. For versions below that the default partitions for binary files is set to 2 which you can change by using the minPartitions value. I am not sure starting 2.1 how the minPartitions column will work because as you said the field is completely ignored. Thanks, Jayadeep On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand wrote: > By looking into the source code, I found that for textFile(), the > partitioning is computed by the computeSplitSize() function in > FileInputFormat class. This function takes into consideration the > minPartitions value passed by user. As per my understanding , the same > thing for binaryFiles() is computed by the setMinPartitions() function of > PortableDataStream class. This setMinPartitions() function completely > ignores the minPartitions value passed by user. However I find that in my > application somehow the partition varies based on the minPartition value in > case of binaryFiles() too. I have no idea how this is happening. Please > help me understand how the partitioning happens in case of binaryFiles(). > > source code for setMinPartitions() is as below: def setMinPartitions(sc: > SparkContext, context: JobContext, minPartitions: Int) { val > defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val > defaultParallelism = sc.defaultParallelism val files = > listStatus(context).asScala val totalBytes = > files.filterNot(_.isDirectory).map(_.getLen > + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, > bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } > ---------- > View this message in context: How does partitioning happen for binary > files in spark ? > <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Re: How does partitioning happen for binary files in spark ?
The code that you see in github is for version 2.1. For versions below that the default partitions for binary files is set to 2 which you can change by using the minPartitions value. I am not sure starting 2.1 how the minPartitions column will work because as you said the field is completely ignored. Thanks, Jayadeep On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand wrote: > By looking into the source code, I found that for textFile(), the > partitioning is computed by the computeSplitSize() function in > FileInputFormat class. This function takes into consideration the > minPartitions value passed by user. As per my understanding , the same > thing for binaryFiles() is computed by the setMinPartitions() function of > PortableDataStream class. This setMinPartitions() function completely > ignores the minPartitions value passed by user. However I find that in my > application somehow the partition varies based on the minPartition value in > case of binaryFiles() too. I have no idea how this is happening. Please > help me understand how the partitioning happens in case of binaryFiles(). > > source code for setMinPartitions() is as below: def setMinPartitions(sc: > SparkContext, context: JobContext, minPartitions: Int) { val > defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val > defaultParallelism = sc.defaultParallelism val files = > listStatus(context).asScala val totalBytes = > files.filterNot(_.isDirectory).map(_.getLen > + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, > bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } > ---------- > View this message in context: How does partitioning happen for binary > files in spark ? > <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
How does partitioning happen for binary files in spark ?
By looking into the source code, I found that for textFile(), the partitioning is computed by the computeSplitSize() function in FileInputFormat class. This function takes into consideration the minPartitions value passed by user. As per my understanding , the same thing for binaryFiles() is computed by the setMinPartitions() function of PortableDataStream class. This setMinPartitions() function completely ignores the minPartitions value passed by user. However I find that in my application somehow the partition varies based on the minPartition value in case of binaryFiles() too. I have no idea how this is happening.Please help me understand how the partitioning happens in case of binaryFiles(). source code for setMinPartitions() is as below:def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)val defaultParallelism = sc.defaultParallelismval files = listStatus(context).asScalaval totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sumval bytesPerCore = totalBytes / defaultParallelismval maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html Sent from the Apache Spark User List mailing list archive at Nabble.com.