[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177170#comment-14177170 ] Reynold Xin commented on SPARK-3948: How often does this bug manifest? If it is often enough, maybe we want to set the option to false by default instead of true. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Fix For: 1.1.1, 1.2.0 Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177173#comment-14177173 ] Josh Rosen commented on SPARK-3948: --- [~rxin] The patch here should actually fix the issue; that option is just an escape hatch in case we were mistaken and our fix doesn't always work. Users should not have to change any configuration options when applying this fix _unless_ the fix doesn't work for some reason. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Fix For: 1.1.1, 1.2.0 Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177195#comment-14177195 ] Reynold Xin commented on SPARK-3948: Ok then it sounds good. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Fix For: 1.1.1, 1.2.0 Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173413#comment-14173413 ] Mridul Muralidharan commented on SPARK-3948: Not exactly, what I was suggesting was : a) At begining of the transferTo method, add {code: java} val initialPos = channel.position() {code} b) At bottom of the transferTo method, before returning size, add {code: java} val finalPos = channel.position() if (finalPos == initialPos) { logWarning(Hit kernal bug, upgrade kernal. Attempting workaround) channel.position(initialPos + size) } else { assert(finalPos == initialPos + size) } {code} What I understand from the javadoc, this should alleviate the problem : ofcourse, will need verification on the setup you have where it is currently failing ! Note that the reason I would prefer this to append is for simple reason : the method is generic method to copy streams - and it might be used (currnetly, or in future) in scenarios where append is not true. So would be good to be defensive about the final state. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173427#comment-14173427 ] Saisai Shao commented on SPARK-3948: Hi [~mridulm80], thanks a lot for your suggestions, here is the snippet I changed: {code} val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() val initialPos = outChannel.position() println(size = + outChannel.size) println(initial position = + initialPos) val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count size) { count += inChannel.transferTo(count, size - count, outChannel) } val finalPos = outChannel.position() println(final position = + finalPos) if (initialPos == finalPos) { outChannel.position(initialPos + count) } else { assert(finalPos == initialPos + count) } {code} And the result shows as below: {noformat} size = 0 initial position = 0 final position = 0 size = 118 for i = 0 size = 118 initial position = 118 final position = 118 size = 118 for i = 1 size = 118 initial position = 236 final position = 236 size = 118 for i = 2 size = 118 initial position = 354 final position = 354 size = 118 for i = 3 size = 118 initial position = 472 final position = 472 size = 118 for i = 4 size = 118 initial position = 590 final position = 590 size = 118 for i = 5 size = 118 initial position = 708 final position = 708 size = 118 for i = 6 size = 118 initial position = 826 final position = 826 size = 118 for i = 7 size = 118 initial position = 944 final position = 944 size = 118 for i = 8 size = 118 initial position = 1062 final position = 1062 size = 118 for i = 9 {noformat} Still has problem in my 2.6.32 machine, though position is moving forward, the file size is still 118. But it is OK in my Ubuntu machine, so probably this is not feasible. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173489#comment-14173489 ] Mridul Muralidharan commented on SPARK-3948: Damn, this sucks : the transferTo is not using the position of the channel to write output to ... while it is doing so when append is true (which is effectively setting position to end of file on call to getChannel). The state of the channel, based on what we see above, is the same in both cases - since we can see the position is updated - and is persisted and returned when we call getChannel in next invocation of copyStreams. So there is some other set of issues at play which we might not be able to workaround from the jvm. Given this, I think we should a) add a logError when initialPosition == finalPosition when inChannel.size 0 asking users to upgrade to a newer linux kernal b) ofcourse use append = true : to workaround immediate issues. (a) will ensure that developers and users/admins will be notified of issues in case other codepaths (currently or in future) hit the same issue. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173528#comment-14173528 ] Saisai Shao commented on SPARK-3948: Hi [~mridulm80], Thanks a lot for your suggestion. I think currently for other codes in Spark which use copyStream will not be affected by this issue, since they only copy one input file to the output file. But for some use cases like ExternalSorter will indeed be affected by it. I will submit a PR according to your suggestions, thanks a lot. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173757#comment-14173757 ] Apache Spark commented on SPARK-3948: - User 'jerryshao' has created a pull request for this issue: https://github.com/apache/spark/pull/2824 Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172055#comment-14172055 ] Saisai Shao commented on SPARK-3948: Hi Josh, thanks for your help. I don't think it's un-flushed input file which makes output file copy length error, since bunch of {{partitionWriters}} are flushed and closed before copying to output file, as you can see line 704 in ExternalSorter {{partitionWriters.foreach(_.commitAndClose())}}. I still doubt that current use of {{transferTo}} may have some problems which will ruin the output file, I try to add append flag for the output file in line 708, seems the problem is gone. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172062#comment-14172062 ] Josh Rosen commented on SPARK-3948: --- Hi [~jerryshao], To make sure that I understand, are you saying that changing {code} out = new FileOutputStream(outputFile) {code} to {code} out = new FileOutputStream(outputFile, append=True) {code} seems to solve the issue, even while continuing to use the {{transferTo}} code? Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172080#comment-14172080 ] Josh Rosen commented on SPARK-3948: --- I'm surprised that the lack of {{append}} didn't cause problems prior to the {{transferTo}} patch. To try to reason this out: *Old code, no append*: We use same FileOutputStream object to write all of the partitions, copying each partition's data by calling {{FileOutputStream.write}} on this same FileOutputStream object. Since we're not in append mode, the first partition is written at the beginning of the file and all subsequent partitions are appended to it. *New {{transferTo}} code, no append:* We call {{FileOutputStream.getChannel()}} for each partition; which [returns the unique FileChannel object associated with this file output stream|http://docs.oracle.com/javase/7/docs/api/java/io/FileOutputStream.html#getChannel()]. Since we're not in append mode, this channel's initial position should be equal to the number of bytes written to the file so far, which should initially be 0 since we just created a new FileOutputStream and haven't written anything using it. According to [its docs|http://docs.oracle.com/javase/6/docs/api/java/nio/channels/FileChannel.html#transferTo(long, long, java.nio.channels.WritableByteChannel)], {{transferTo}} should increment the target channel's position by the number of bytes written. So, in this case it also seems like the first partition should be written to the beginning of the file while others are appended after it. Am I missing an obvious bug here? Is there a reason why these two copy implementations don't behave identically when append is disabled? Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172116#comment-14172116 ] Mridul Muralidharan commented on SPARK-3948: [~joshrosen] Assuming there are no VM bugs being hit for inChannel.size() or some other concurrent writes to these files, I dont see any issues with the code - as you elaborated. On other hand, external sort code is slightly loose w.r.t use of file api - not sure if that is causing the observed problems : example, use of skip() in SortShuffleManager.scala. We will need to investigate in detail if some of these are causing the observed problems. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) -
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172117#comment-14172117 ] Saisai Shao commented on SPARK-3948: Hi Josh, I think for old code without append, the behavior is correct even without {{append}} flag, because we never close the output stream between different partitions, so input streams for each partition will always write to the end of output stream, I think that's correct. For new {{transferTo}} code without append, theoretically it's correct that target channel's position should increment by the bytes written, but according to my investigation through logs, I found that for the subsequent partitions, output channel's position unexpectedly pointed to 0, not the previous output file size, that's also the unexplained behavior which makes me confuse a lot. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172134#comment-14172134 ] Mridul Muralidharan commented on SPARK-3948: [~jerryshao] Just to clarify, what exactly is the behavior you are observing ? - Is it that getChannel is returning a channel which has position == 0 after writing bytes to the stream ? (size 0) If yes, what is the channel's length you are observing in that case ? Also, how large are the file sizes ? The documentation of getChannel and transferTo are fairly unambigous ... so our code, as written, is conforment to that. Ofcourse, it is always possible we are hitting some bugs in some scenarios ! What is the environment you are running this on btw ? OS/jvm version ? Thanks. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large.
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172152#comment-14172152 ] Saisai Shao commented on SPARK-3948: Hi [~mridul], what I observed is that, after writing bytes to channel, still get the position 0 in the next calling, here is one of the log that I traced: {noformat} java.lang.AssertionError: assertion failed: outChannel before size, 272314, before position: 0. Current size: 284167, current position: 0, expected write data size: 284167 {noformat} I just print out the channel size and position *before* and *after* the {{transferTo}} action. As you can see the output file size is not 0 before written, but the position is 0, not the end of channel. After written to it use {{transferTo}}, the file size is changed, but not 272314 + 284167 as expected, and the position also not move to the end. Also I add append flag as I mentioned before, here the log changes to: {noformat} 14/10/15 13:14:12 INFO util.Utils: outChannel before size, 0, before position: 0. Current size: 294896, current position: 294896, expected write data size: 294896 14/10/15 13:14:12 INFO util.Utils: outChannel before size, 294896, before position: 294896. Current size: 551216, current position: 551216, expected write data size: 256320 {noformat} And my OS Redhat 6.2, and JVM is 1.8.0_20-ea. Appreciate your suggestions, thanks a lot. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172225#comment-14172225 ] Mridul Muralidharan commented on SPARK-3948: That is weird, I tried a stripped down version to test just this - and it seems to be working fine. In scala interpreter, this seems to work as expected. {code:java} import java.io._ import java.nio.ByteBuffer import java.nio._ import java.nio.channels._ def copyStream(in: InputStream, out: OutputStream, closeStreams: Boolean = false): Long = { var count = 0L try { if (in.isInstanceOf[FileInputStream] out.isInstanceOf[FileOutputStream]) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() println(size = + outChannel.size) println(position = + outChannel.position) val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count size) { count += inChannel.transferTo(count, size - count, outChannel) } } else { val buf = new Array[Byte](8192) var n = 0 while (n != -1) { n = in.read(buf) if (n != -1) { out.write(buf, 0, n) count += n } } } count } finally { if (closeStreams) { try { in.close() } finally { out.close() } } } } val out = new FileOutputStream(output) for (i - 0 until 10) { val in = new FileInputStream(t) val size = copyStream(in, out, false) println(size = + size + for i = + i) in.close() } out.close() {code} Scenarios tried : a) No output file. b) Empty output file. c) Non empty output file. And it seemed to work fine (and as expected) for all the cases. Can you try this at your end ? I want to eliminate any potential environment issues. I tried this with 1.7.0_55 and 1.8.0_25 ... Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172226#comment-14172226 ] Mridul Muralidharan commented on SPARK-3948: Note, t is just some file with a few strings in it - simply generate something locally. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172371#comment-14172371 ] Saisai Shao commented on SPARK-3948: Hi [~mridulm80], thanks a lot for your suggestions, I tested with different Java version and two OS: Redhat 6.2 and Ubuntu 14.04, I think this behavior is OS or kernel related, not jvm version, you can see the output as below: First is the result under Redhat, I tried Java 1.8.0_20, 1.7.0_60 and 1.7.0_04, the result is: {noformat} size = 0 position = 0 size = 118 for i = 0 size = 118 position = 0 size = 118 for i = 1 size = 118 position = 0 size = 118 for i = 2 size = 118 position = 0 size = 118 for i = 3 size = 118 position = 0 size = 118 for i = 4 size = 118 position = 0 size = 118 for i = 5 size = 118 position = 0 size = 118 for i = 6 size = 118 position = 0 size = 118 for i = 7 size = 118 position = 0 size = 118 for i = 8 size = 118 position = 0 size = 118 for i = 9 {noformat} Obvious the position is always 0, so the final output file size is just 118. Then changed to Ubuntu 14.04, with Java 1.7.0_04 and 1.7.0_51, the result shows as below: {noformat} size = 0 position = 0 size = 118 for i = 0 size = 118 position = 118 size = 118 for i = 1 size = 236 position = 236 size = 118 for i = 2 size = 354 position = 354 size = 118 for i = 3 size = 472 position = 472 size = 118 for i = 4 size = 590 position = 590 size = 118 for i = 5 size = 708 position = 708 size = 118 for i = 6 size = 826 position = 826 size = 118 for i = 7 size = 944 position = 944 size = 118 for i = 8 size = 1062 position = 1062 size = 118 for i = 9 {noformat} This is the result we expected, and position also seek to the right end, so the final output file size is 1180 as expected. My Ubuntu machine's kernel version is: 3.13.0-37-generic; and Redhat machine's kernel version is: 2.6.32-220.el6.x86_64. So I guess maybe the difference relies on kernel version or OS version, but still need to verify. Besides I think to keep the consistency, we can add *append=true* flag to enforce the same output even in different platforms. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172582#comment-14172582 ] Josh Rosen commented on SPARK-3948: --- That kernel bug seems like it could explain why the {{transferTo}} change caused problems in sort-based shuffle. It looks like SPARK-3630 also describes some other places where the Kryo PARSING_ERROR(2) has occurred, so I'm going to try to figure out which of these other cases might also hit this {{transferTo}} code path. In terms of a bug fix / workaround: maybe we can open the file in append mode, since that seems to solve the problem, and also add an internal configuration option to disable the transferTo code; this configuration / feature-flag would provide an escape hatch for users in case the append fix doesn't work. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172585#comment-14172585 ] Mridul Muralidharan commented on SPARK-3948: [~jerryshao] great work ! I agree, append might be a workaround to consider (given the semantics of getChannel when stream is opened with append). On other hand, since this piece of code might be used in general context also (the copyStreams) - what about logging a warning in case position != initialPosition + size at end of the transferTo loop ? Warning users that they should upgrade kernal ? (and explicitly modifying position as workaround) Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172895#comment-14172895 ] Josh Rosen commented on SPARK-3948: --- [~mridulm80], that explicit position-checking and modification idea is a clever workaround. I think that this JIRA explains the cause of the stream-corruption issues that we've seen in the sort-based shuffle code, but after some investigation I think that it doesn't explain the similar errors that we've seen when deserializing tasks that were broadcasted with TorrentBroadcast; I'll keep investigating this. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail:
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171920#comment-14171920 ] Patrick Wendell commented on SPARK-3948: Great thanks [~jerryshao] - I updated the description. It sounds like this is just an issue with something in the sort-based code that can cause all kinds of random stream exceptions. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172024#comment-14172024 ] Josh Rosen commented on SPARK-3948: --- I've been looking over the code in https://github.com/apache/spark/pull/1884 and I haven't spotted anything glaringly wrong. One hunch, though: in the {{transferTo}} code, we read the size of the file up front and then copy that many bytes, whereas the old, non-{{transferTo}} code keeps reading until it hits the end of the stream. This could potentially account for different behavior if the size of the underlying file increased while the copy was taking place. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) -