[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441952#comment-16441952
 ] 

liuxian edited comment on SPARK-23989 at 4/18/18 6:21 AM:
----------------------------------------------------------

1.  Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable

{color:#cc7832}override def 
{color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}](
 shuffleId: {color:#cc7832}Int,{color} numMaps: {color:#cc7832}Int,{color} 
dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}C{color}]): ShuffleHandle = {
 {color:#cc7832}if 
{color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, 
{color}dependency){color:#14892c} && false {color}) {
 {color:#808080}// If there are fewer than 
spark.shuffle.sort.bypassMergeThreshold partitions and we 
don't{color}{color:#808080} // need map-side aggregation, then write 
numPartitions files directly and just concatenate{color}{color:#808080} // them 
at the end. This avoids doing serialization and deserialization twice to 
merge{color}{color:#808080} // together the spilled files, which would happen 
with the normal code path. The downside is{color}{color:#808080} // having 
multiple files open at a time and thus more memory allocated to buffers.{color} 
{color:#cc7832}new 
{color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else if 
{color}(SortShuffleManager.canUseSerializedShuffle(dependency) 
{color:#14892c}&& false{color}) {
 {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, 
since this is more efficient:{color} {color:#cc7832}new 
{color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else {color}{
 {color:#808080}// Otherwise, buffer map outputs in a deserialized form:{color} 
{color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, 
{color}numMaps{color:#cc7832}, {color}dependency)
 }
 }

 

2. Run this unit test in 'DataFrameAggregateSuite.scala'

test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as 
group-by ordinal."{color})

3.  I have been debugging in IDEA, grab this information:

{{ _buffer = \{PartitionedPairBuffer@9817}_ }}
 {{ _capacity = 64_}}
 {{ _curSize = 2_}}
 {{ _data = {Object[128]@9832}_ }}
 {{  _0 = \{Tuple2@9834} "(3,3)"_}}
 {{  {color:#14892c}_1 = \{UnsafeRow@9835} "[0,2,2]"_{color}}}
 {{  _2 = \{Tuple2@9841} "(4,4)"_}}
 {{  _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}}

 

 


was (Author: 10110346):
1.  Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable

{color:#cc7832}override def 
{color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}](
 shuffleId: {color:#cc7832}Int,
{color} numMaps: {color:#cc7832}Int,
{color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}C{color}]): ShuffleHandle = {
 {color:#cc7832}if 
{color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, 
{color}dependency){color:#14892c} && false {color}) {
 {color:#808080}// If there are fewer than 
spark.shuffle.sort.bypassMergeThreshold partitions and we don't
{color}{color:#808080} // need map-side aggregation, then write numPartitions 
files directly and just concatenate
{color}{color:#808080} // them at the end. This avoids doing serialization and 
deserialization twice to merge
{color}{color:#808080} // together the spilled files, which would happen with 
the normal code path. The downside is
{color}{color:#808080} // having multiple files open at a time and thus more 
memory allocated to buffers.
{color} {color:#cc7832}new 
{color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else if 
{color}(SortShuffleManager.canUseSerializedShuffle(dependency) 
{color:#14892c}&& false{color}) {
 {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, 
since this is more efficient:
{color} {color:#cc7832}new 
{color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else {color}{
 {color:#808080}// Otherwise, buffer map outputs in a deserialized form:
{color} {color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, 
{color}numMaps{color:#cc7832}, {color}dependency)
 }
}

 

2. Run this unit test in 'DataFrameAggregateSuite.scala'

test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as 
group-by ordinal."{color})

3.  I have been debugging in IDEA, grab this information:

{{ _buffer = \{PartitionedPairBuffer@9817}_ }}
{{ _capacity = 64_}}
{{ _curSize = 2_}}
{{ _data = \{Object[128]@9832}_ }}
{{  _0 = \{Tuple2@9834} "(3,3)"_}}
{{  _1 = \{UnsafeRow@9835} "[0,2,2]"_}}
{{  _2 = \{Tuple2@9841} "(4,4)"_}}
{{  _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}}

 

 

> When using `SortShuffleWriter`, the data will be overwritten
> ------------------------------------------------------------
>
>                 Key: SPARK-23989
>                 URL: https://issues.apache.org/jira/browse/SPARK-23989
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: liuxian
>            Priority: Critical
>         Attachments: 无标题2.png
>
>
> {color:#333333}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#333333}' into 
> '{color}PartitionedAppendOnlyMap{color:#333333}' or 
> '{color}PartitionedPairBuffer{color:#333333}'.{color}
> {color:#333333}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#333333} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to