[jira] [Commented] (SPARK-3948) Potential file append bugs in ExternalSorter which leads to sort-based shuffle unexpected exception
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171861#comment-14171861 ] Saisai Shao commented on SPARK-3948: Thanks for your help :). > Potential file append bugs in ExternalSorter which leads to sort-based > shuffle unexpected exception > --- > > Key: SPARK-3948 > URL: https://issues.apache.org/jira/browse/SPARK-3948 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.2.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > > Several exceptions occurred when running TPC-DS queries against latest master > branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, > deserializing error in Kryo and offset out-range in FileManagedBuffer, all > these exceptions are gone when we changed to hash-based shuffle. > With deep investigation, we found that some shuffle output file is > unexpectedly smaller than the others, as the log shows: > {noformat} > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 > {noformat} > As you can see the total file length of shuffle_6_11_11 is much smaller than > other same stage map output results. > And we also dump the map outputs in map side to see if this small size output > is correct or not, below is the log: > {noformat} > In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- > 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: > 274722 262597 291290 272902 264941 270358 291005 295285 252482 > 287142 232617 259871 233734 241439 228897 234282 253834 235619 > 233803 255532 270739 253825 262087 266404 234273 250120 262983 > 257024 255947 254971 258908 247862 221613 258566 245399 251684 > 274843 226150 264278 245279 225656 235084 239466 212851 242245 > 218781 222191 215500 211548 234256 208601 204113 191923 217895 > 227020 215331 212313 223725 250876 256875 239276 266777 235520 > 237462 234063 242270 246825 255888 235937 236956 233099 264508 > 260303 233294 239061 254856 257475 230105 246553 260412 210355 > 211201 219572 206636 226866 209937 226618 218208 206255 248069 > 221717 222112 215734 248088 239207 246125 239056 241133 253091 > 246738 233128 242794 231606 255737 221123 252115 247286 229688 > 251087 250047 237579 263079 256251 238214 208641 201120 204009 > 200825 211965 200600 194492 226471 194887 226975 215072 206008 > 233288 222132 208860 219064 218162 237126 220465 201343 225711 > 232178 233786 212767 211462 213671 215853 227822 233782 214727 > 247001 228968 247413 222674 214241 184122 215643 207665 219079 > 215185 207718 212723 201613 216600 212591 208174 204195 208099 > 229079 230274 223373 214999 256626 228895 231821 383405 229646 > 220212 245495 245960 227556 213266 237203 203805 240509 239306 > 242365 218416 238487 219397 240026 251011 258369 255365 259811 > 283313 248450 264286 264562 257485 279459 249187 257609 274964 > 292369 273826 > {noformat} > Here I dump the file name, length and each partition's length, obviously the > sum of all partition lengths is not equal to file length. So I think there > may be a situation paritionWriter in ExternalSorter not always append to the > end of previous written file, the file's content is overwritten in some > parts, and this lead to the exceptions I mentioned before. > Also I changed the code of copyStream by disable transferTo, use the previous > one, all the issues are gone. So I think there maybe some flushing problems > in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Potential file append bugs in ExternalSorter which leads to sort-based shuffle unexpected exception
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171859#comment-14171859 ] Josh Rosen commented on SPARK-3948: --- Thanks for narrowing the problem down to a single commit; this is extremely helpful. I've assigned this JIRA to you, but I'll help investigate, too. > Potential file append bugs in ExternalSorter which leads to sort-based > shuffle unexpected exception > --- > > Key: SPARK-3948 > URL: https://issues.apache.org/jira/browse/SPARK-3948 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.2.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > > Several exceptions occurred when running TPC-DS queries against latest master > branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, > deserializing error in Kryo and offset out-range in FileManagedBuffer, all > these exceptions are gone when we changed to hash-based shuffle. > With deep investigation, we found that some shuffle output file is > unexpectedly smaller than the others, as the log shows: > {noformat} > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 > {noformat} > As you can see the total file length of shuffle_6_11_11 is much smaller than > other same stage map output results. > And we also dump the map outputs in map side to see if this small size output > is correct or not, below is the log: > {noformat} > In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- > 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: > 274722 262597 291290 272902 264941 270358 291005 295285 252482 > 287142 232617 259871 233734 241439 228897 234282 253834 235619 > 233803 255532 270739 253825 262087 266404 234273 250120 262983 > 257024 255947 254971 258908 247862 221613 258566 245399 251684 > 274843 226150 264278 245279 225656 235084 239466 212851 242245 > 218781 222191 215500 211548 234256 208601 204113 191923 217895 > 227020 215331 212313 223725 250876 256875 239276 266777 235520 > 237462 234063 242270 246825 255888 235937 236956 233099 264508 > 260303 233294 239061 254856 257475 230105 246553 260412 210355 > 211201 219572 206636 226866 209937 226618 218208 206255 248069 > 221717 222112 215734 248088 239207 246125 239056 241133 253091 > 246738 233128 242794 231606 255737 221123 252115 247286 229688 > 251087 250047 237579 263079 256251 238214 208641 201120 204009 > 200825 211965 200600 194492 226471 194887 226975 215072 206008 > 233288 222132 208860 219064 218162 237126 220465 201343 225711 > 232178 233786 212767 211462 213671 215853 227822 233782 214727 > 247001 228968 247413 222674 214241 184122 215643 207665 219079 > 215185 207718 212723 201613 216600 212591 208174 204195 208099 > 229079 230274 223373 214999 256626 228895 231821 383405 229646 > 220212 245495 245960 227556 213266 237203 203805 240509 239306 > 242365 218416 238487 219397 240026 251011 258369 255365 259811 > 283313 248450 264286 264562 257485 279459 249187 257609 274964 > 292369 273826 > {noformat} > Here I dump the file name, length and each partition's length, obviously the > sum of all partition lengths is not equal to file length. So I think there > may be a situation paritionWriter in ExternalSorter not always append to the > end of previous written file, the file's content is overwritten in some > parts, and this lead to the exceptions I mentioned before. > Also I changed the code of copyStream by disable transferTo, use the previous > one, all the issues are gone. So I think there maybe some flushing problems > in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Potential file append bugs in ExternalSorter which leads to sort-based shuffle unexpected exception
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171834#comment-14171834 ] Saisai Shao commented on SPARK-3948: Hi Josh, according to my observation, this bug is occurred in this branch of writePartitionedFile (ExternalSorter), but I cannot reproduce this in my local mode with small data size, {code} if (bypassMergeSort && partitionWriters != null) { // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null lengths(i) = size } } finally { if (out != null) { out.close() } if (in != null) { in.close() } } {code} Currently I just revert this commit (https://github.com/apache/spark/commit/246cb3f158686348a698d1c0da3001c314727129) in copyStream to workaround this issue. I still need to investigate why transferTo will get wrong result. Besides, would you mind assign this JIRA to me, thanks a lot. > Potential file append bugs in ExternalSorter which leads to sort-based > shuffle unexpected exception > --- > > Key: SPARK-3948 > URL: https://issues.apache.org/jira/browse/SPARK-3948 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.2.0 >Reporter: Saisai Shao > > Several exceptions occurred when running TPC-DS queries against latest master > branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, > deserializing error in Kryo and offset out-range in FileManagedBuffer, all > these exceptions are gone when we changed to hash-based shuffle. > With deep investigation, we found that some shuffle output file is > unexpectedly smaller than the others, as the log shows: > {noformat} > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 > {noformat} > As you can see the total file length of shuffle_6_10_11 is much smaller than > other same stage map output results. > And we also dump the map outputs in map side to see if this small size output > is correct or not, below is the log: > {noformat} > In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- > 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: > 274722 262597 291290 272902 264941 270358 291005 295285 252482 > 287142 232617 259871 233734 241439 228897 234282 253834 235619 > 233803 255532 270739 253825 262087 266404 234273 250120 262983 > 257024 255947 254971 258908 247862 221613 258566 245399 251684 > 274843 226150 264278 245279 225656 235084 239466 212851 242245 > 218781 222191 215500 211548 234256 208601 204113 191923 217895 > 227020 215331 212313 223725 250876 256875 239276 266777 235520 > 237462 234063 242270 246825 255888 235937 236956 233099 264508 > 260303 233294 239061 254856 257475 230105 246553 260412 210355 > 211201 219572 206636 226866 209937 226618 218208 206255 248069 > 221717 222112 215734 248088 239207 246125 239056 241133 253091 > 246738 233128 242794 231606 255737 221123 252115 247286 229688 > 251087 250047 237579 263079 256251 238214 208641 201120 204009 > 200825 211965 200600 194492 226471 194887 226975 215072 206008 > 233288 222132 208860 219064 218162 237126 220465 201343 225711 > 232178 233786 212767 211462 213671 215853 227822 233782 214727 > 247001 228968 247413 222674 214241 184122 215643 207665 219079 > 215185 207718 212723 2016
[jira] [Commented] (SPARK-3948) Potential file append bugs in ExternalSorter which leads to sort-based shuffle unexpected exception
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171295#comment-14171295 ] Josh Rosen commented on SPARK-3948: --- Hi [~jerryshao]], Could you share the changes that you made to copyStream (maybe by attaching a {{git diff}} to this ticket)? I want to play around with your fix to see whether I can spot the problem. > Potential file append bugs in ExternalSorter which leads to sort-based > shuffle unexpected exception > --- > > Key: SPARK-3948 > URL: https://issues.apache.org/jira/browse/SPARK-3948 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.2.0 >Reporter: Saisai Shao > > Several exceptions occurred when running TPC-DS queries against latest master > branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, > deserializing error in Kryo and offset out-range in FileManagedBuffer, all > these exceptions are gone when we changed to hash-based shuffle. > With deep investigation, we found that some shuffle output file is > unexpectedly smaller than the others, as the log shows: > {noformat} > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 > {noformat} > As you can see the total file length of shuffle_6_10_11 is much smaller than > other same stage map output results. > And we also dump the map outputs in map side to see if this small size output > is correct or not, below is the log: > {noformat} > In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- > 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: > 274722 262597 291290 272902 264941 270358 291005 295285 252482 > 287142 232617 259871 233734 241439 228897 234282 253834 235619 > 233803 255532 270739 253825 262087 266404 234273 250120 262983 > 257024 255947 254971 258908 247862 221613 258566 245399 251684 > 274843 226150 264278 245279 225656 235084 239466 212851 242245 > 218781 222191 215500 211548 234256 208601 204113 191923 217895 > 227020 215331 212313 223725 250876 256875 239276 266777 235520 > 237462 234063 242270 246825 255888 235937 236956 233099 264508 > 260303 233294 239061 254856 257475 230105 246553 260412 210355 > 211201 219572 206636 226866 209937 226618 218208 206255 248069 > 221717 222112 215734 248088 239207 246125 239056 241133 253091 > 246738 233128 242794 231606 255737 221123 252115 247286 229688 > 251087 250047 237579 263079 256251 238214 208641 201120 204009 > 200825 211965 200600 194492 226471 194887 226975 215072 206008 > 233288 222132 208860 219064 218162 237126 220465 201343 225711 > 232178 233786 212767 211462 213671 215853 227822 233782 214727 > 247001 228968 247413 222674 214241 184122 215643 207665 219079 > 215185 207718 212723 201613 216600 212591 208174 204195 208099 > 229079 230274 223373 214999 256626 228895 231821 383405 229646 > 220212 245495 245960 227556 213266 237203 203805 240509 239306 > 242365 218416 238487 219397 240026 251011 258369 255365 259811 > 283313 248450 264286 264562 257485 279459 249187 257609 274964 > 292369 273826 > {noformat} > Here I dump the file name, length and each partition's length, obviously the > sum of all partition lengths is not equal to file length. So I think there > may be a situation paritionWriter in ExternalSorter not always append to the > end of previous written file, the file's content is overwritten in some > parts, and this lead to the exceptions I mentioned before. > Also I changed the code of copyStream by disable transferTo, use the previous > one, all the issues are gone. So I think there maybe some flushing problems > in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Potential file append bugs in ExternalSorter which leads to sort-based shuffle unexpected exception
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170990#comment-14170990 ] Saisai Shao commented on SPARK-3948: I think the issue of SPARK-3630 is related to this one > Potential file append bugs in ExternalSorter which leads to sort-based > shuffle unexpected exception > --- > > Key: SPARK-3948 > URL: https://issues.apache.org/jira/browse/SPARK-3948 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.2.0 >Reporter: Saisai Shao > > Several exceptions occurred when running TPC-DS queries against latest master > branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, > deserializing error in Kryo and offset out-range in FileManagedBuffer, all > these exceptions are gone when we changed to hash-based shuffle. > With deep investigation, we found that some shuffle output file is > unexpectedly smaller than the others, as the log shows: > {noformat} > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 > 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: > shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 > {noformat} > As you can see the total file length of shuffle_6_10_11 is much smaller than > other same stage map output results. > And we also dump the map outputs in map side to see if this small size output > is correct or not, below is the log: > {noformat} > In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- > 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: > 274722 262597 291290 272902 264941 270358 291005 295285 252482 > 287142 232617 259871 233734 241439 228897 234282 253834 235619 > 233803 255532 270739 253825 262087 266404 234273 250120 262983 > 257024 255947 254971 258908 247862 221613 258566 245399 251684 > 274843 226150 264278 245279 225656 235084 239466 212851 242245 > 218781 222191 215500 211548 234256 208601 204113 191923 217895 > 227020 215331 212313 223725 250876 256875 239276 266777 235520 > 237462 234063 242270 246825 255888 235937 236956 233099 264508 > 260303 233294 239061 254856 257475 230105 246553 260412 210355 > 211201 219572 206636 226866 209937 226618 218208 206255 248069 > 221717 222112 215734 248088 239207 246125 239056 241133 253091 > 246738 233128 242794 231606 255737 221123 252115 247286 229688 > 251087 250047 237579 263079 256251 238214 208641 201120 204009 > 200825 211965 200600 194492 226471 194887 226975 215072 206008 > 233288 222132 208860 219064 218162 237126 220465 201343 225711 > 232178 233786 212767 211462 213671 215853 227822 233782 214727 > 247001 228968 247413 222674 214241 184122 215643 207665 219079 > 215185 207718 212723 201613 216600 212591 208174 204195 208099 > 229079 230274 223373 214999 256626 228895 231821 383405 229646 > 220212 245495 245960 227556 213266 237203 203805 240509 239306 > 242365 218416 238487 219397 240026 251011 258369 255365 259811 > 283313 248450 264286 264562 257485 279459 249187 257609 274964 > 292369 273826 > {noformat} > Here I dump the file name, length and each partition's length, obviously the > sum of all partition lengths is not equal to file length. So I think there > may be a situation paritionWriter in ExternalSorter not always append to the > end of previous written file, the file's content is overwritten in some > parts, and this lead to the exceptions I mentioned before. > Also I changed the code of copyStream by disable transferTo, use the previous > one, all the issues are gone. So I think there maybe some flushing problems > in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org