[ 
https://issues.apache.org/jira/browse/SPARK-31948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengruifeng updated SPARK-31948:
---------------------------------
    Description: 
{{1, aggregateByKey,}} {{reduceByKey}} and  {{foldByKey}} will always perform 
{{mapSideCombine}};

However, this can be skiped sometime, specially in ML (RobustScaler):
{code:java}
vectors.mapPartitions { iter =>
  if (iter.hasNext) {
    val summaries = Array.fill(numFeatures)(
      new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
relativeError))
    while (iter.hasNext) {
      val vec = iter.next
      vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
summaries(i).insert(v) }
    }
    Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
  } else Iterator.empty
}.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
 

This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
all, similar places exist in {{KMeans}}, {{GMM}}, etc;

to my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
isn't high;

 

2, {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
{{mapSideCombine in the first call of {{foldByKey}}{{}}}}{{ can also be 
avoided.}}

 

{{SPARK-772: "Map side combine in group by key case does not reduce the amount 
of data shuffled. Instead, it forces a lot more objects to go into old gen, and 
leads to worse GC."}}

 

{{So what about:}}

{{1, exposing mapSideCombine in aggByKey/reduceByKey/foldByKey, so that user 
can disable unnecessary }}{{mapSideCombine}}{{;}}

{{2, disabling the }}\{{mapSideCombine in the first call of }}\{{foldByKey in 
}}{{treeAggregate}}{\{ and }}{{treeReduce}}{{;}}{{}}

{{3, }}\{{disabling the uncessary }}{{mapSideCombine in ML;}}{{}}

 

 

{{friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
[~viirya] }}

 

 

 

 

 

  was:
{{1, aggregateByKey,}} {{reduceByKey}} and  {{foldByKey}} will always perform 
{{mapSideCombine}};

However, this can be skiped sometime, specially in ML (RobustScaler):
{code:java}
vectors.mapPartitions { iter =>
  if (iter.hasNext) {
    val summaries = Array.fill(numFeatures)(
      new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
relativeError))
    while (iter.hasNext) {
      val vec = iter.next
      vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
summaries(i).insert(v) }
    }
    Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
  } else Iterator.empty
}.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
 

This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
all, similar places exist in {{KMeans}}, {{GMM}}, etc;

to my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
isn't high;

 

2, {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
{{mapSideCombine in the first call of }}{{foldByKey can also be avoided.}}{{}}

 

{{SPARK-772: "Map side combine in group by key case does not reduce the amount 
of data shuffled. Instead, it forces a lot more objects to go into old gen, and 
leads to worse GC."}}

 

{{So what about:}}

{{1, exposing mapSideCombine in aggByKey/reduceByKey/foldByKey, so that user 
can disable unnecessary }}{{mapSideCombine}}{{;}}

{{2, disabling the }}{{mapSideCombine in the first call of }}{{foldByKey in 
}}{{treeAggregate}}{{ and }}{{treeReduce}}{{;}}{{}}

{{3, }}{{disabling the uncessary }}{{mapSideCombine in ML;}}{{}}

 

 

{{friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
[~viirya] }}

 

 

 

 

 


> expose mapSideCombine in aggByKey/reduceByKey/foldByKey
> -------------------------------------------------------
>
>                 Key: SPARK-31948
>                 URL: https://issues.apache.org/jira/browse/SPARK-31948
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML, Spark Core
>    Affects Versions: 3.1.0
>            Reporter: zhengruifeng
>            Priority: Minor
>
> {{1, aggregateByKey,}} {{reduceByKey}} and  {{foldByKey}} will always perform 
> {{mapSideCombine}};
> However, this can be skiped sometime, specially in ML (RobustScaler):
> {code:java}
> vectors.mapPartitions { iter =>
>   if (iter.hasNext) {
>     val summaries = Array.fill(numFeatures)(
>       new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
> relativeError))
>     while (iter.hasNext) {
>       val vec = iter.next
>       vec.foreach { (i, v) => if (!v.isNaN) summaries(i) = 
> summaries(i).insert(v) }
>     }
>     Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
>   } else Iterator.empty
> }.reduceByKey { case (s1, s2) => s1.merge(s2) } {code}
>  
> This {{reduceByKey}} in {{RobustScaler}} does not need {{mapSideCombine}} at 
> all, similar places exist in {{KMeans}}, {{GMM}}, etc;
> to my knowledge, we do not need {{mapSideCombine}} if the reduction factor 
> isn't high;
>  
> 2, {{treeAggregate}} and {{treeReduce}} are based on {{foldByKey}},  the 
> {{mapSideCombine in the first call of {{foldByKey}}{{}}}}{{ can also be 
> avoided.}}
>  
> {{SPARK-772: "Map side combine in group by key case does not reduce the 
> amount of data shuffled. Instead, it forces a lot more objects to go into old 
> gen, and leads to worse GC."}}
>  
> {{So what about:}}
> {{1, exposing mapSideCombine in aggByKey/reduceByKey/foldByKey, so that user 
> can disable unnecessary }}{{mapSideCombine}}{{;}}
> {{2, disabling the }}\{{mapSideCombine in the first call of }}\{{foldByKey in 
> }}{{treeAggregate}}{\{ and }}{{treeReduce}}{{;}}{{}}
> {{3, }}\{{disabling the uncessary }}{{mapSideCombine in ML;}}{{}}
>  
>  
> {{friendly ping [~srowen] [~huaxingao] [~weichenxu123] [~hyukjin.kwon] 
> [~viirya] }}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to