[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-20 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177170#comment-14177170
 ] 

Reynold Xin commented on SPARK-3948:


How often does this bug manifest? If it is often enough, maybe we want to set 
the option to false by default instead of true. 

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao
 Fix For: 1.1.1, 1.2.0


 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-20 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177173#comment-14177173
 ] 

Josh Rosen commented on SPARK-3948:
---

[~rxin] The patch here should actually fix the issue; that option is just an 
escape hatch in case we were mistaken and our fix doesn't always work.  Users 
should not have to change any configuration options when applying this fix 
_unless_ the fix doesn't work for some reason.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao
 Fix For: 1.1.1, 1.2.0


 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-20 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177195#comment-14177195
 ] 

Reynold Xin commented on SPARK-3948:


Ok then it sounds good. 

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao
 Fix For: 1.1.1, 1.2.0


 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173413#comment-14173413
 ] 

Mridul Muralidharan commented on SPARK-3948:


Not exactly, what I was suggesting was :

a) At begining of the transferTo method, add
{code: java}
  val initialPos = channel.position()
{code}
b) At bottom of the transferTo method, before returning size, add
{code: java}
  val finalPos = channel.position()
  if (finalPos == initialPos) {
logWarning(Hit kernal bug, upgrade kernal. Attempting workaround)
channel.position(initialPos + size)
  } else {
assert(finalPos == initialPos + size)
  }
{code}


What I understand from the javadoc, this should alleviate the problem : 
ofcourse, will need verification on the setup you have where it is currently 
failing !
Note that the reason I would prefer this to append is for simple reason : the 
method is generic method to copy streams - and it might be used (currnetly, or 
in future) in scenarios where append is not true. So would be good to be 
defensive about the final state.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173427#comment-14173427
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridulm80], thanks a lot for your suggestions, here is the snippet I 
changed:

{code}
   val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
println(size =  + outChannel.size)
println(initial position =  + initialPos)
val size = inChannel.size()
// In case transferTo method transferred less data than we have 
required.
while (count  size) {
  count += inChannel.transferTo(count, size - count, outChannel)
}
val finalPos = outChannel.position()
println(final position =  + finalPos)
if (initialPos == finalPos) {
  outChannel.position(initialPos + count)
} else {
  assert(finalPos == initialPos + count)
}
{code}

And the result shows as below:

{noformat}
size = 0
initial position = 0
final position = 0
size = 118 for i = 0
size = 118
initial position = 118
final position = 118
size = 118 for i = 1
size = 118
initial position = 236
final position = 236
size = 118 for i = 2
size = 118
initial position = 354
final position = 354
size = 118 for i = 3
size = 118
initial position = 472
final position = 472
size = 118 for i = 4
size = 118
initial position = 590
final position = 590
size = 118 for i = 5
size = 118
initial position = 708
final position = 708
size = 118 for i = 6
size = 118
initial position = 826
final position = 826
size = 118 for i = 7
size = 118
initial position = 944
final position = 944
size = 118 for i = 8
size = 118
initial position = 1062
final position = 1062
size = 118 for i = 9
{noformat}

Still has problem in my 2.6.32 machine, though position is moving forward, the 
file size is still 118. But it is OK in my Ubuntu machine, so probably this is 
not feasible.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173489#comment-14173489
 ] 

Mridul Muralidharan commented on SPARK-3948:



Damn, this sucks : the transferTo is not using the position of the channel to 
write output to ... while it is doing so when append is true (which is 
effectively setting position to end of file on call to getChannel).
The state of the channel, based on what we see above, is the same in both cases 
- since we can see the position is updated - and is persisted and returned when 
we call getChannel in next invocation of copyStreams.
So there is some other set of issues at play which we might not be able to 
workaround from the jvm.


Given this, I think we should 
a) add a logError when initialPosition == finalPosition when inChannel.size  0 
asking users to upgrade to a newer linux kernal
b) ofcourse use append = true : to workaround immediate issues.

(a) will ensure that developers and users/admins will be notified of issues in 
case other codepaths (currently or in future) hit the same issue.



 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173528#comment-14173528
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridulm80],

Thanks a lot for your suggestion. I think currently for other codes in Spark 
which use copyStream will not be affected by this issue, since they only copy 
one input file to the output file. But for some use cases like ExternalSorter  
will indeed be affected by it. I will submit a PR according to your 
suggestions, thanks a lot.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173757#comment-14173757
 ] 

Apache Spark commented on SPARK-3948:
-

User 'jerryshao' has created a pull request for this issue:
https://github.com/apache/spark/pull/2824

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172055#comment-14172055
 ] 

Saisai Shao commented on SPARK-3948:


Hi Josh, thanks for your help. I don't think it's un-flushed input file which 
makes output file copy length error, since bunch of {{partitionWriters}} are 
flushed and closed before copying to output file, as you can see line 704 in 
ExternalSorter {{partitionWriters.foreach(_.commitAndClose())}}.

I still doubt that current use of {{transferTo}} may have some problems which 
will ruin the output file, I try to add append flag for the output file in line 
708, seems the problem is gone.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172062#comment-14172062
 ] 

Josh Rosen commented on SPARK-3948:
---

Hi [~jerryshao],

To make sure that I understand, are you saying that changing

{code}
out = new FileOutputStream(outputFile)
{code}

to

{code}
out = new FileOutputStream(outputFile, append=True)
{code}

seems to solve the issue, even while continuing to use the {{transferTo}} code?

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172080#comment-14172080
 ] 

Josh Rosen commented on SPARK-3948:
---

I'm surprised that the lack of {{append}} didn't cause problems prior to the 
{{transferTo}} patch.  To try to reason this out:

*Old code, no append*: We use same FileOutputStream object to write all of the 
partitions, copying each partition's data by calling {{FileOutputStream.write}} 
on this same FileOutputStream object.  Since we're not in append mode, the 
first partition is written at the beginning of the file and all subsequent 
partitions are appended to it.

*New {{transferTo}} code, no append:* We call {{FileOutputStream.getChannel()}} 
for each partition; which [returns the unique FileChannel object associated 
with this file output 
stream|http://docs.oracle.com/javase/7/docs/api/java/io/FileOutputStream.html#getChannel()].
  Since we're not in append mode, this channel's initial position should be 
equal to the number of bytes written to the file so far, which should 
initially be 0 since we just created a new FileOutputStream and haven't written 
anything using it.  According to [its 
docs|http://docs.oracle.com/javase/6/docs/api/java/nio/channels/FileChannel.html#transferTo(long,
 long, java.nio.channels.WritableByteChannel)], {{transferTo}} should increment 
the target channel's position by the number of bytes written.  So, in this case 
it also seems like the first partition should be written to the beginning of 
the file while others are appended after it.

Am I missing an obvious bug here?  Is there a reason why these two copy 
implementations don't behave identically when append is disabled? 

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172116#comment-14172116
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~joshrosen] Assuming there are no VM bugs being hit for inChannel.size() or 
some other concurrent writes to these files, I dont see any issues with the 
code - as you elaborated.
On other hand, external sort code is slightly loose w.r.t use of file api - not 
sure if that is causing the observed problems : example, use of skip() in 
SortShuffleManager.scala.
We will need to investigate in detail if some of these are causing the observed 
problems.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172117#comment-14172117
 ] 

Saisai Shao commented on SPARK-3948:


Hi Josh,

I think for old code without append, the behavior is correct even without 
{{append}} flag, because we never close the output stream between different 
partitions, so input streams for each partition will always write to the end of 
output stream, I think that's correct.

For new {{transferTo}} code without append, theoretically it's correct that 
target channel's position should increment by the bytes written, but according 
to my investigation through logs, I found that for the subsequent partitions, 
output channel's position unexpectedly pointed to 0, not the previous output 
file size, that's also the unexplained behavior which makes me confuse a lot.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172134#comment-14172134
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] Just to clarify, what exactly is the behavior you are observing ?
- Is it that getChannel is returning a channel which has position == 0 after 
writing bytes to the stream ? (size  0)
If yes, what is the channel's length you are observing in that case ?

Also, how large are the file sizes ?

The documentation of getChannel and transferTo are fairly unambigous ... so our 
code, as written, is conforment to that. Ofcourse, it is always possible we are 
hitting some bugs in some scenarios !
What is the environment you are running this on btw ? OS/jvm version ? Thanks.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.


[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172152#comment-14172152
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridul], what I observed is that, after writing bytes to channel, still 
get the position 0 in the next calling, here is one of the log that I traced:

{noformat}
java.lang.AssertionError: assertion failed: outChannel before size, 272314, 
before position: 0. Current size: 284167, current position: 0, expected write 
data size: 284167
{noformat}

I just print out the channel size and position *before* and *after* the 
{{transferTo}} action. As you can see the output file size is not 0 before 
written, but the position is 0, not the end of channel. After written to it use 
{{transferTo}}, the file size is changed, but not 272314 + 284167 as expected, 
and the position also not move to the end.

Also I add append flag as I mentioned before, here the log changes to:

{noformat}
14/10/15 13:14:12 INFO util.Utils: outChannel before size, 0, before position: 
0. Current size: 294896, current position: 294896, expected write data size: 
294896
14/10/15 13:14:12 INFO util.Utils: outChannel before size, 294896, before 
position: 294896. Current size: 551216, current position: 551216, expected 
write data size: 256320
{noformat}

And my OS Redhat 6.2, and JVM is 1.8.0_20-ea.

Appreciate your suggestions, thanks a lot.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172225#comment-14172225
 ] 

Mridul Muralidharan commented on SPARK-3948:


That is weird, I tried a stripped down version to test just this - and it seems 
to be working fine.

In scala interpreter, this seems to work as expected.
{code:java}

import java.io._
import java.nio.ByteBuffer
import java.nio._
import java.nio.channels._
  def copyStream(in: InputStream,
 out: OutputStream,
 closeStreams: Boolean = false): Long =
  {
var count = 0L
try {
  if (in.isInstanceOf[FileInputStream]  
out.isInstanceOf[FileOutputStream]) {
// When both streams are File stream, use transferTo to improve copy 
performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
println(size =  + outChannel.size)
println(position =  + outChannel.position)
val size = inChannel.size()
// In case transferTo method transferred less data than we have 
required.
while (count  size) {
  count += inChannel.transferTo(count, size - count, outChannel)
}
  } else {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
  n = in.read(buf)
  if (n != -1) {
out.write(buf, 0, n)
count += n
  }
}
}
  count
   } finally {
  if (closeStreams) {
try {
  in.close()
} finally {
  out.close()
}
  }
}
  }
val out = new FileOutputStream(output)
for (i - 0 until 10) {
val in = new FileInputStream(t)
 val size = copyStream(in, out, false)
println(size =  + size +  for i =  + i)
in.close()
}
out.close()

{code}


Scenarios tried :
a) No output file.
b) Empty output file.
c) Non empty output file.

And it seemed to work fine (and as expected) for all the cases.
Can you try this at your end ? I want to eliminate any potential environment 
issues.
I tried this with 1.7.0_55 and 1.8.0_25 ...

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172226#comment-14172226
 ] 

Mridul Muralidharan commented on SPARK-3948:


Note, t is just some file with a few strings in it - simply generate 
something locally.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172371#comment-14172371
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridulm80], thanks a lot for your suggestions, I tested with different 
Java version and two OS: Redhat 6.2 and Ubuntu 14.04, I think this behavior is 
OS or kernel related, not jvm version, you can see the output as below:

First is the result under Redhat, I tried Java 1.8.0_20, 1.7.0_60 and 1.7.0_04, 
the result is:

{noformat}
size = 0
position = 0
size = 118 for i = 0
size = 118
position = 0
size = 118 for i = 1
size = 118
position = 0
size = 118 for i = 2
size = 118
position = 0
size = 118 for i = 3
size = 118
position = 0
size = 118 for i = 4
size = 118
position = 0
size = 118 for i = 5
size = 118
position = 0
size = 118 for i = 6
size = 118
position = 0
size = 118 for i = 7
size = 118
position = 0
size = 118 for i = 8
size = 118
position = 0
size = 118 for i = 9
{noformat}

Obvious the position is always 0, so the final output file size is just 118.

Then changed to Ubuntu 14.04, with Java 1.7.0_04 and 1.7.0_51, the result shows 
as below:

{noformat}
size = 0
position = 0
size = 118 for i = 0
size = 118
position = 118
size = 118 for i = 1
size = 236
position = 236
size = 118 for i = 2
size = 354
position = 354
size = 118 for i = 3
size = 472
position = 472
size = 118 for i = 4
size = 590
position = 590
size = 118 for i = 5
size = 708
position = 708
size = 118 for i = 6
size = 826
position = 826
size = 118 for i = 7
size = 944
position = 944
size = 118 for i = 8
size = 1062
position = 1062
size = 118 for i = 9
{noformat}

This is the result we expected, and position also seek to the right end, so the 
final output file size is 1180 as expected.

My Ubuntu machine's kernel version is: 3.13.0-37-generic; and Redhat machine's 
kernel version is: 2.6.32-220.el6.x86_64.

So I guess maybe the difference relies on kernel version or OS version, but 
still need to verify.

Besides I think to keep the consistency, we can add *append=true* flag to 
enforce the same output even in different platforms.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172582#comment-14172582
 ] 

Josh Rosen commented on SPARK-3948:
---

That kernel bug seems like it could explain why the {{transferTo}} change 
caused problems in sort-based shuffle.  It looks like SPARK-3630 also describes 
some other places where the Kryo PARSING_ERROR(2) has occurred, so I'm going to 
try to figure out which of these other cases might also hit this {{transferTo}} 
code path.

In terms of a bug fix / workaround: maybe we can open the file in append mode, 
since that seems to solve the problem, and also add an internal configuration 
option to disable the transferTo code; this configuration / feature-flag would 
provide an escape hatch for users in case the append fix doesn't work.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172585#comment-14172585
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] great work !
I agree, append might be a workaround to consider (given the semantics of 
getChannel when stream is opened with append).
On other hand, since this piece of code might be used in general context also 
(the copyStreams) - what about logging a warning in case position != 
initialPosition + size at end of the transferTo loop ? Warning users that they 
should upgrade kernal ? (and explicitly modifying position as workaround)



 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172895#comment-14172895
 ] 

Josh Rosen commented on SPARK-3948:
---

[~mridulm80], that explicit position-checking and modification idea is a clever 
workaround.

I think that this JIRA explains the cause of the stream-corruption issues that 
we've seen in the sort-based shuffle code, but after some investigation I think 
that it doesn't explain the similar errors that we've seen when deserializing 
tasks that were broadcasted with TorrentBroadcast; I'll keep investigating this.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-14 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171920#comment-14171920
 ] 

Patrick Wendell commented on SPARK-3948:


Great thanks [~jerryshao] - I updated the description. It sounds like this is 
just an issue with something in the sort-based code that can cause all kinds of 
random stream exceptions.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-14 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172024#comment-14172024
 ] 

Josh Rosen commented on SPARK-3948:
---

I've been looking over the code in https://github.com/apache/spark/pull/1884 
and I haven't spotted anything glaringly wrong.  One hunch, though: in the 
{{transferTo}} code, we read the size of the file up front and then copy that 
many bytes, whereas the old, non-{{transferTo}} code keeps reading until it 
hits the end of the stream.  This could potentially account for different 
behavior if the size of the underlying file increased while the copy was taking 
place.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-