Re: How does partitioning happen for binary files in spark ?

2017-04-06 Thread Jay
The code that you see in github is for version 2.1. For versions below that
the default partitions for binary files is set to 2 which you can change by
using the minPartitions value. I am not sure starting 2.1 how the
minPartitions column will work because as you said the field is completely
ignored.

Thanks,
Jayadeep

On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand  wrote:

> By looking into the source code, I found that for textFile(), the
> partitioning is computed by the computeSplitSize() function in
> FileInputFormat class. This function takes into consideration the
> minPartitions value passed by user. As per my understanding , the same
> thing for binaryFiles() is computed by the setMinPartitions() function of
> PortableDataStream class. This setMinPartitions() function completely
> ignores the minPartitions value passed by user. However I find that in my
> application somehow the partition varies based on the minPartition value in
> case of binaryFiles() too. I have no idea how this is happening. Please
> help me understand how the partitioning happens in case of binaryFiles().
>
> source code for setMinPartitions() is as below: def setMinPartitions(sc:
> SparkContext, context: JobContext, minPartitions: Int) { val
> defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val
> defaultParallelism = sc.defaultParallelism val files =
> listStatus(context).asScala val totalBytes = 
> files.filterNot(_.isDirectory).map(_.getLen
> + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes,
> bytesPerCore)) super.setMaxSplitSize(maxSplitSize) }
> ----------
> View this message in context: How does partitioning happen for binary
> files in spark ?
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Re: How does partitioning happen for binary files in spark ?

2017-04-06 Thread Jay
The code that you see in github is for version 2.1. For versions below that
the default partitions for binary files is set to 2 which you can change by
using the minPartitions value. I am not sure starting 2.1 how the
minPartitions column will work because as you said the field is completely
ignored.

Thanks,
Jayadeep

On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand  wrote:

> By looking into the source code, I found that for textFile(), the
> partitioning is computed by the computeSplitSize() function in
> FileInputFormat class. This function takes into consideration the
> minPartitions value passed by user. As per my understanding , the same
> thing for binaryFiles() is computed by the setMinPartitions() function of
> PortableDataStream class. This setMinPartitions() function completely
> ignores the minPartitions value passed by user. However I find that in my
> application somehow the partition varies based on the minPartition value in
> case of binaryFiles() too. I have no idea how this is happening. Please
> help me understand how the partitioning happens in case of binaryFiles().
>
> source code for setMinPartitions() is as below: def setMinPartitions(sc:
> SparkContext, context: JobContext, minPartitions: Int) { val
> defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val
> defaultParallelism = sc.defaultParallelism val files =
> listStatus(context).asScala val totalBytes = 
> files.filterNot(_.isDirectory).map(_.getLen
> + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes,
> bytesPerCore)) super.setMaxSplitSize(maxSplitSize) }
> ----------
> View this message in context: How does partitioning happen for binary
> files in spark ?
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


How does partitioning happen for binary files in spark ?

2017-04-06 Thread ashwini anand
By looking into the source code, I found that for textFile(), the
partitioning is computed by the computeSplitSize() function in
FileInputFormat class. This function takes into consideration the
minPartitions value passed by user. As per my understanding , the same thing
for binaryFiles() is computed by the setMinPartitions() function of
PortableDataStream class. This setMinPartitions() function completely
ignores the minPartitions value passed by user. However I find that in my
application somehow the partition varies based on the minPartition value in
case of binaryFiles() too. I have no idea how this is happening.Please help
me understand how the partitioning happens in case of binaryFiles().

source code for setMinPartitions() is as below:def setMinPartitions(sc:
SparkContext, context: JobContext, minPartitions: Int) {val
defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)   
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)val
defaultParallelism = sc.defaultParallelismval files =
listStatus(context).asScalaval totalBytes =
files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sumval
bytesPerCore = totalBytes / defaultParallelismval maxSplitSize =
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))   
super.setMaxSplitSize(maxSplitSize)  } 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.