[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441952#comment-16441952 ]
liuxian edited comment on SPARK-23989 at 4/18/18 6:21 AM: ---------------------------------------------------------- 1. Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable {color:#cc7832}override def {color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]( shuffleId: {color:#cc7832}Int,{color} numMaps: {color:#cc7832}Int,{color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]): ShuffleHandle = { {color:#cc7832}if {color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, {color}dependency){color:#14892c} && false {color}) { {color:#808080}// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't{color}{color:#808080} // need map-side aggregation, then write numPartitions files directly and just concatenate{color}{color:#808080} // them at the end. This avoids doing serialization and deserialization twice to merge{color}{color:#808080} // together the spilled files, which would happen with the normal code path. The downside is{color}{color:#808080} // having multiple files open at a time and thus more memory allocated to buffers.{color} {color:#cc7832}new {color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else if {color}(SortShuffleManager.canUseSerializedShuffle(dependency) {color:#14892c}&& false{color}) { {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:{color} {color:#cc7832}new {color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else {color}{ {color:#808080}// Otherwise, buffer map outputs in a deserialized form:{color} {color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency) } } 2. Run this unit test in 'DataFrameAggregateSuite.scala' test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as group-by ordinal."{color}) 3. I have been debugging in IDEA, grab this information: {{ _buffer = \{PartitionedPairBuffer@9817}_ }} {{ _capacity = 64_}} {{ _curSize = 2_}} {{ _data = {Object[128]@9832}_ }} {{ _0 = \{Tuple2@9834} "(3,3)"_}} {{ {color:#14892c}_1 = \{UnsafeRow@9835} "[0,2,2]"_{color}}} {{ _2 = \{Tuple2@9841} "(4,4)"_}} {{ _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}} was (Author: 10110346): 1. Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable {color:#cc7832}override def {color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]( shuffleId: {color:#cc7832}Int, {color} numMaps: {color:#cc7832}Int, {color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]): ShuffleHandle = { {color:#cc7832}if {color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, {color}dependency){color:#14892c} && false {color}) { {color:#808080}// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't {color}{color:#808080} // need map-side aggregation, then write numPartitions files directly and just concatenate {color}{color:#808080} // them at the end. This avoids doing serialization and deserialization twice to merge {color}{color:#808080} // together the spilled files, which would happen with the normal code path. The downside is {color}{color:#808080} // having multiple files open at a time and thus more memory allocated to buffers. {color} {color:#cc7832}new {color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else if {color}(SortShuffleManager.canUseSerializedShuffle(dependency) {color:#14892c}&& false{color}) { {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: {color} {color:#cc7832}new {color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else {color}{ {color:#808080}// Otherwise, buffer map outputs in a deserialized form: {color} {color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency) } } 2. Run this unit test in 'DataFrameAggregateSuite.scala' test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as group-by ordinal."{color}) 3. I have been debugging in IDEA, grab this information: {{ _buffer = \{PartitionedPairBuffer@9817}_ }} {{ _capacity = 64_}} {{ _curSize = 2_}} {{ _data = \{Object[128]@9832}_ }} {{ _0 = \{Tuple2@9834} "(3,3)"_}} {{ _1 = \{UnsafeRow@9835} "[0,2,2]"_}} {{ _2 = \{Tuple2@9841} "(4,4)"_}} {{ _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}} > When using `SortShuffleWriter`, the data will be overwritten > ------------------------------------------------------------ > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: liuxian > Priority: Critical > Attachments: 无标题2.png > > > {color:#333333}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#333333}' into > '{color}PartitionedAppendOnlyMap{color:#333333}' or > '{color}PartitionedPairBuffer{color:#333333}'.{color} > {color:#333333}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#333333} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org