[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172055#comment-14172055
 ] 

Saisai Shao commented on SPARK-3948:


Hi Josh, thanks for your help. I don't think it's un-flushed input file which 
makes output file copy length error, since bunch of {{partitionWriters}} are 
flushed and closed before copying to output file, as you can see line 704 in 
ExternalSorter {{partitionWriters.foreach(_.commitAndClose())}}.

I still doubt that current use of {{transferTo}} may have some problems which 
will ruin the output file, I try to add append flag for the output file in line 
708, seems the problem is gone.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172062#comment-14172062
 ] 

Josh Rosen commented on SPARK-3948:
---

Hi [~jerryshao],

To make sure that I understand, are you saying that changing

{code}
out = new FileOutputStream(outputFile)
{code}

to

{code}
out = new FileOutputStream(outputFile, append=True)
{code}

seems to solve the issue, even while continuing to use the {{transferTo}} code?

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3954) promote the speed of convert files to RDDS

2014-10-15 Thread JIRA
宿荣全 created SPARK-3954:
--

 Summary:  promote the speed of convert files to RDDS
 Key: SPARK-3954
 URL: https://issues.apache.org/jira/browse/SPARK-3954
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 1.1.0, 1.0.0
Reporter: 宿荣全


about convert files to RDDS there are 3 loops with files sequence in spark 
source.
loops files sequence:
1、files.map(...)
2、files.zip(fileRDDs)
3、files-size.foreach
It's will very time consuming when lots of files.So I do the following 
correction:
3 loops with files sequence = only one loop

spark source code:
  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, 
V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) = {
  if (rdd.partitions.size == 0) {
logError(File  + file +  has no data in it. Spark Streaming can only 
ingest  +
  files that have been \moved\ to the directory assigned to the file 
stream.  +
  Refer to the streaming programming guide for more details.)
  }
}}
new UnionRDD(context.sparkContext, fileRDDs)
  }
// 
---
modified code:
  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = for (file - files; rdd = 
context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
  if (rdd.partitions.size == 0) {
logError(File  + file +  has no data in it. Spark Streaming can only 
ingest  +
  files that have been \moved\ to the directory assigned to the file 
stream.  +
  Refer to the streaming programming guide for more details.)
  }
  rdd
}
new UnionRDD(context.sparkContext, fileRDDs)
  }





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172080#comment-14172080
 ] 

Josh Rosen commented on SPARK-3948:
---

I'm surprised that the lack of {{append}} didn't cause problems prior to the 
{{transferTo}} patch.  To try to reason this out:

*Old code, no append*: We use same FileOutputStream object to write all of the 
partitions, copying each partition's data by calling {{FileOutputStream.write}} 
on this same FileOutputStream object.  Since we're not in append mode, the 
first partition is written at the beginning of the file and all subsequent 
partitions are appended to it.

*New {{transferTo}} code, no append:* We call {{FileOutputStream.getChannel()}} 
for each partition; which [returns the unique FileChannel object associated 
with this file output 
stream|http://docs.oracle.com/javase/7/docs/api/java/io/FileOutputStream.html#getChannel()].
  Since we're not in append mode, this channel's initial position should be 
equal to the number of bytes written to the file so far, which should 
initially be 0 since we just created a new FileOutputStream and haven't written 
anything using it.  According to [its 
docs|http://docs.oracle.com/javase/6/docs/api/java/nio/channels/FileChannel.html#transferTo(long,
 long, java.nio.channels.WritableByteChannel)], {{transferTo}} should increment 
the target channel's position by the number of bytes written.  So, in this case 
it also seems like the first partition should be written to the beginning of 
the file while others are appended after it.

Am I missing an obvious bug here?  Is there a reason why these two copy 
implementations don't behave identically when append is disabled? 

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 

[jira] [Created] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl

2014-10-15 Thread Jongyoul Lee (JIRA)
Jongyoul Lee created SPARK-3955:
---

 Summary: Different versions between jackson-mapper-asl and 
jackson-core-asl
 Key: SPARK-3955
 URL: https://issues.apache.org/jira/browse/SPARK-3955
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 1.1.0
Reporter: Jongyoul Lee


In the parent pom.xml, specified a version of jackson-mapper-asl. This is used 
by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl is not 
same as jackson-core-asl. This is because other libraries use several versions 
of jackson, so other version of jackson-core-asl is assembled. Simply, fix this 
problem if pom.xml has a specific version information of jackson-core-asl. If 
it's not set, a version 1.9.11 is merged info assembly.jar and we cannot use 
jackson library properly.

{code}
[INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the 
shaded jar.
[INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the shaded 
jar.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172116#comment-14172116
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~joshrosen] Assuming there are no VM bugs being hit for inChannel.size() or 
some other concurrent writes to these files, I dont see any issues with the 
code - as you elaborated.
On other hand, external sort code is slightly loose w.r.t use of file api - not 
sure if that is causing the observed problems : example, use of skip() in 
SortShuffleManager.scala.
We will need to investigate in detail if some of these are causing the observed 
problems.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172117#comment-14172117
 ] 

Saisai Shao commented on SPARK-3948:


Hi Josh,

I think for old code without append, the behavior is correct even without 
{{append}} flag, because we never close the output stream between different 
partitions, so input streams for each partition will always write to the end of 
output stream, I think that's correct.

For new {{transferTo}} code without append, theoretically it's correct that 
target channel's position should increment by the bytes written, but according 
to my investigation through logs, I found that for the subsequent partitions, 
output channel's position unexpectedly pointed to 0, not the previous output 
file size, that's also the unexplained behavior which makes me confuse a lot.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172134#comment-14172134
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] Just to clarify, what exactly is the behavior you are observing ?
- Is it that getChannel is returning a channel which has position == 0 after 
writing bytes to the stream ? (size  0)
If yes, what is the channel's length you are observing in that case ?

Also, how large are the file sizes ?

The documentation of getChannel and transferTo are fairly unambigous ... so our 
code, as written, is conforment to that. Ofcourse, it is always possible we are 
hitting some bugs in some scenarios !
What is the environment you are running this on btw ? OS/jvm version ? Thanks.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.


[jira] [Commented] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl

2014-10-15 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172148#comment-14172148
 ] 

Sean Owen commented on SPARK-3955:
--

Looks like Jackson is managed to version 1.8.8 for Avro reasons. I think core 
just needs to be managed the same way. I'll try it locally to make sure that 
works.

 Different versions between jackson-mapper-asl and jackson-core-asl
 --

 Key: SPARK-3955
 URL: https://issues.apache.org/jira/browse/SPARK-3955
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 1.1.0
Reporter: Jongyoul Lee

 In the parent pom.xml, specified a version of jackson-mapper-asl. This is 
 used by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl 
 is not same as jackson-core-asl. This is because other libraries use several 
 versions of jackson, so other version of jackson-core-asl is assembled. 
 Simply, fix this problem if pom.xml has a specific version information of 
 jackson-core-asl. If it's not set, a version 1.9.11 is merged info 
 assembly.jar and we cannot use jackson library properly.
 {code}
 [INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the 
 shaded jar.
 [INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the 
 shaded jar.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172152#comment-14172152
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridul], what I observed is that, after writing bytes to channel, still 
get the position 0 in the next calling, here is one of the log that I traced:

{noformat}
java.lang.AssertionError: assertion failed: outChannel before size, 272314, 
before position: 0. Current size: 284167, current position: 0, expected write 
data size: 284167
{noformat}

I just print out the channel size and position *before* and *after* the 
{{transferTo}} action. As you can see the output file size is not 0 before 
written, but the position is 0, not the end of channel. After written to it use 
{{transferTo}}, the file size is changed, but not 272314 + 284167 as expected, 
and the position also not move to the end.

Also I add append flag as I mentioned before, here the log changes to:

{noformat}
14/10/15 13:14:12 INFO util.Utils: outChannel before size, 0, before position: 
0. Current size: 294896, current position: 294896, expected write data size: 
294896
14/10/15 13:14:12 INFO util.Utils: outChannel before size, 294896, before 
position: 294896. Current size: 551216, current position: 551216, expected 
write data size: 256320
{noformat}

And my OS Redhat 6.2, and JVM is 1.8.0_20-ea.

Appreciate your suggestions, thanks a lot.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 

[jira] [Closed] (SPARK-3898) History Web UI display incorrectly.

2014-10-15 Thread zzc (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zzc closed SPARK-3898.
--
  Resolution: Not a Problem
Target Version/s: 1.2.0

 History Web UI display incorrectly.
 ---

 Key: SPARK-3898
 URL: https://issues.apache.org/jira/browse/SPARK-3898
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 1.2.0
 Environment: Spark 1.2.0-snapshot On Yarn
Reporter: zzc
 Fix For: 1.2.0


 After successfully run an spark application, history web ui display 
 incorrectly: 
 App Name:Not Started Started:1970/01/01 07:59:59 Spark User:Not Started
 Last Updated:2014/10/10 14:50:39 
 Exception message:
 2014-10-10 14:51:14,284 - ERROR - 
 org.apache.spark.Logging$class.logError(Logging.scala:96) - 
 qtp1594785497-16851 -Exception in parsing Spark event log 
 hdfs://wscluster/sparklogs/24.3g_15_5g_2c-1412923684977/EVENT_LOG_1
 org.json4s.package$MappingException: Did not find value which can be 
 converted into int
 at org.json4s.reflect.package$.fail(package.scala:96)
 at org.json4s.Extraction$.convert(Extraction.scala:554)
 at org.json4s.Extraction$.extract(Extraction.scala:331)
 at org.json4s.Extraction$.extract(Extraction.scala:42)
 at 
 org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
 at 
 org.apache.spark.util.JsonProtocol$.blockManagerIdFromJson(JsonProtocol.scala:647)
 at 
 org.apache.spark.util.JsonProtocol$.blockManagerAddedFromJson(JsonProtocol.scala:468)
 at 
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:404)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at 
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at 
 org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$loadAppInfo(FsHistoryProvider.scala:181)
 at 
 org.apache.spark.deploy.history.FsHistoryProvider.getAppUI(FsHistoryProvider.scala:99)
 at 
 org.apache.spark.deploy.history.HistoryServer$$anon$3.load(HistoryServer.scala:55)
 at 
 org.apache.spark.deploy.history.HistoryServer$$anon$3.load(HistoryServer.scala:53)
 at 
 com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
 at 
 com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
 at 
 com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
 at 
 com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
 at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
 at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
 at 
 com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
 at 
 org.apache.spark.deploy.history.HistoryServer$$anon$1.doGet(HistoryServer.scala:88)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
 at 
 org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
 at 
 org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
 at 
 org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
 at 
 org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
 at 
 org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
 at 
 org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
 at 
 org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
 at org.eclipse.jetty.server.Server.handle(Server.java:370)
 at 
 org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
 at 
 org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
 at 
 

[jira] [Commented] (SPARK-3954) promote the speed of convert files to RDDS

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172167#comment-14172167
 ] 

Apache Spark commented on SPARK-3954:
-

User 'surq' has created a pull request for this issue:
https://github.com/apache/spark/pull/2811

  promote the speed of convert files to RDDS
 ---

 Key: SPARK-3954
 URL: https://issues.apache.org/jira/browse/SPARK-3954
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 1.0.0, 1.1.0
Reporter: 宿荣全

 about convert files to RDDS there are 3 loops with files sequence in spark 
 source.
 loops files sequence:
 1、files.map(...)
 2、files.zip(fileRDDs)
 3、files-size.foreach
 It's will very time consuming when lots of files.So I do the following 
 correction:
 3 loops with files sequence = only one loop
 spark source code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, 
 V, F](file))
 files.zip(fileRDDs).foreach { case (file, rdd) = {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
 }}
 new UnionRDD(context.sparkContext, fileRDDs)
   }
 // 
 ---
 modified code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = for (file - files; rdd = 
 context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
   rdd
 }
 new UnionRDD(context.sparkContext, fileRDDs)
   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172225#comment-14172225
 ] 

Mridul Muralidharan commented on SPARK-3948:


That is weird, I tried a stripped down version to test just this - and it seems 
to be working fine.

In scala interpreter, this seems to work as expected.
{code:java}

import java.io._
import java.nio.ByteBuffer
import java.nio._
import java.nio.channels._
  def copyStream(in: InputStream,
 out: OutputStream,
 closeStreams: Boolean = false): Long =
  {
var count = 0L
try {
  if (in.isInstanceOf[FileInputStream]  
out.isInstanceOf[FileOutputStream]) {
// When both streams are File stream, use transferTo to improve copy 
performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
println(size =  + outChannel.size)
println(position =  + outChannel.position)
val size = inChannel.size()
// In case transferTo method transferred less data than we have 
required.
while (count  size) {
  count += inChannel.transferTo(count, size - count, outChannel)
}
  } else {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
  n = in.read(buf)
  if (n != -1) {
out.write(buf, 0, n)
count += n
  }
}
}
  count
   } finally {
  if (closeStreams) {
try {
  in.close()
} finally {
  out.close()
}
  }
}
  }
val out = new FileOutputStream(output)
for (i - 0 until 10) {
val in = new FileInputStream(t)
 val size = copyStream(in, out, false)
println(size =  + size +  for i =  + i)
in.close()
}
out.close()

{code}


Scenarios tried :
a) No output file.
b) Empty output file.
c) Non empty output file.

And it seemed to work fine (and as expected) for all the cases.
Can you try this at your end ? I want to eliminate any potential environment 
issues.
I tried this with 1.7.0_55 and 1.8.0_25 ...

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172226#comment-14172226
 ] 

Mridul Muralidharan commented on SPARK-3948:


Note, t is just some file with a few strings in it - simply generate 
something locally.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3939) NPE caused by SessionState.out not set in thriftserver2

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172230#comment-14172230
 ] 

Apache Spark commented on SPARK-3939:
-

User 'adrian-wang' has created a pull request for this issue:
https://github.com/apache/spark/pull/2812

 NPE caused by SessionState.out not set in thriftserver2
 ---

 Key: SPARK-3939
 URL: https://issues.apache.org/jira/browse/SPARK-3939
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: Adrian Wang
Assignee: Adrian Wang

 a simple 'set' query can reproduce this in thriftserver.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172371#comment-14172371
 ] 

Saisai Shao commented on SPARK-3948:


Hi [~mridulm80], thanks a lot for your suggestions, I tested with different 
Java version and two OS: Redhat 6.2 and Ubuntu 14.04, I think this behavior is 
OS or kernel related, not jvm version, you can see the output as below:

First is the result under Redhat, I tried Java 1.8.0_20, 1.7.0_60 and 1.7.0_04, 
the result is:

{noformat}
size = 0
position = 0
size = 118 for i = 0
size = 118
position = 0
size = 118 for i = 1
size = 118
position = 0
size = 118 for i = 2
size = 118
position = 0
size = 118 for i = 3
size = 118
position = 0
size = 118 for i = 4
size = 118
position = 0
size = 118 for i = 5
size = 118
position = 0
size = 118 for i = 6
size = 118
position = 0
size = 118 for i = 7
size = 118
position = 0
size = 118 for i = 8
size = 118
position = 0
size = 118 for i = 9
{noformat}

Obvious the position is always 0, so the final output file size is just 118.

Then changed to Ubuntu 14.04, with Java 1.7.0_04 and 1.7.0_51, the result shows 
as below:

{noformat}
size = 0
position = 0
size = 118 for i = 0
size = 118
position = 118
size = 118 for i = 1
size = 236
position = 236
size = 118 for i = 2
size = 354
position = 354
size = 118 for i = 3
size = 472
position = 472
size = 118 for i = 4
size = 590
position = 590
size = 118 for i = 5
size = 708
position = 708
size = 118 for i = 6
size = 826
position = 826
size = 118 for i = 7
size = 944
position = 944
size = 118 for i = 8
size = 1062
position = 1062
size = 118 for i = 9
{noformat}

This is the result we expected, and position also seek to the right end, so the 
final output file size is 1180 as expected.

My Ubuntu machine's kernel version is: 3.13.0-37-generic; and Redhat machine's 
kernel version is: 2.6.32-220.el6.x86_64.

So I guess maybe the difference relies on kernel version or OS version, but 
still need to verify.

Besides I think to keep the consistency, we can add *append=true* flag to 
enforce the same output even in different platforms.


 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 

[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application

2014-10-15 Thread Praveen Seluka (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172387#comment-14172387
 ] 

Praveen Seluka commented on SPARK-3174:
---

[~andrewor] - Can you also comment on the API exposed to add/delete executors 
from SparkContext ? I believe it will be, sc.addExecutors(count : Int)
sc.deleteExecutors(List[String])

[~sandyr] [~tgraves] [~andrewor] [~vanzin] - Can you please take a look at the 
design doc I have proposed. I am sure there are some pros in doing it this way 
- Have indicated them in detail in the doc. Since, it does not change Spark 
Core itself, you could easily replace with another pluggable algorithm for 
dynamic scaling. I know that Anrdrew already have a PR based on his design doc, 
but would surely love to get some feedback.

 Provide elastic scaling within a Spark application
 --

 Key: SPARK-3174
 URL: https://issues.apache.org/jira/browse/SPARK-3174
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, YARN
Affects Versions: 1.0.2
Reporter: Sandy Ryza
Assignee: Andrew Or
 Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, 
 dynamic-scaling-executors-10-6-14.pdf


 A common complaint with Spark in a multi-tenant environment is that 
 applications have a fixed allocation that doesn't grow and shrink with their 
 resource needs.  We're blocked on YARN-1197 for dynamically changing the 
 resources within executors, but we can still allocate and discard whole 
 executors.
 It would be useful to have some heuristics that
 * Request more executors when many pending tasks are building up
 * Discard executors when they are idle
 See the latest design doc for more information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2075) Anonymous classes are missing from Spark distribution

2014-10-15 Thread Iulian Dragos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172394#comment-14172394
 ] 

Iulian Dragos commented on SPARK-2075:
--

The Scala compiler produces stable names for anonymous functions. In fact, 
that's the reason why the name of the enclosing method is part of the name: so 
that adding or removing an anonymous function in another method does not change 
the numbering of the others. Names are assigned by using a per-compilation unit 
counter and a prefix. Looking at the diff, there's quite a different picture in 
the two cases (anonymous functions vs. anonymous classes). Are you sure the two 
jars are built from the same sources?

I don't know how the `assembly` jar is produced, but if it's using some sort of 
whole-program analysis and dead-code elimination, it might erroneously remove 
them. It might help to look at the inputs to the assembly and see if the class 
is already missing.

Another possibility is running `scalac -optimize` in only one of the two 
builds. However, looking at current sources I can't see why the inliner would 
remove those closures (the class is not final, and `map` is not final either, 
so they can't be resolved and inlined).. 

 Anonymous classes are missing from Spark distribution
 -

 Key: SPARK-2075
 URL: https://issues.apache.org/jira/browse/SPARK-2075
 Project: Spark
  Issue Type: Bug
  Components: Build, Spark Core
Affects Versions: 1.0.0
Reporter: Paul R. Brown
Priority: Critical
 Fix For: 1.0.1


 Running a job built against the Maven dep for 1.0.0 and the hadoop1 
 distribution produces:
 {code}
 java.lang.ClassNotFoundException:
 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1
 {code}
 Here's what's in the Maven dep as of 1.0.0:
 {code}
 jar tvf 
 ~/.m2/repository/org/apache/spark/spark-core_2.10/1.0.0/spark-core_2.10-1.0.0.jar
  | grep 'rdd/RDD' | grep 'saveAs'
   1519 Mon May 26 13:57:58 PDT 2014 
 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class
   1560 Mon May 26 13:57:58 PDT 2014 
 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class
 {code}
 And here's what's in the hadoop1 distribution:
 {code}
 jar tvf spark-assembly-1.0.0-hadoop1.0.4.jar| grep 'rdd/RDD' | grep 'saveAs'
 {code}
 I.e., it's not there.  It is in the hadoop2 distribution:
 {code}
 jar tvf spark-assembly-1.0.0-hadoop2.2.0.jar| grep 'rdd/RDD' | grep 'saveAs'
   1519 Mon May 26 07:29:54 PDT 2014 
 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class
   1560 Mon May 26 07:29:54 PDT 2014 
 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3629) Improvements to YARN doc

2014-10-15 Thread ssj (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172416#comment-14172416
 ] 

ssj commented on SPARK-3629:


I have pushed an request on github about it. 
https://github.com/apache/spark/pull/2813

 Improvements to YARN doc
 

 Key: SPARK-3629
 URL: https://issues.apache.org/jira/browse/SPARK-3629
 Project: Spark
  Issue Type: Documentation
  Components: Documentation, YARN
Reporter: Matei Zaharia
  Labels: starter

 Right now this doc starts off with a big list of config options, and only 
 then tells you how to submit an app. It would be better to put that part and 
 the packaging part first, and the config options only at the end.
 In addition, the doc mentions yarn-cluster vs yarn-client as separate 
 masters, which is inconsistent with the help output from spark-submit (which 
 says to always use yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3629) Improvements to YARN doc

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172444#comment-14172444
 ] 

Apache Spark commented on SPARK-3629:
-

User 'ssjssh' has created a pull request for this issue:
https://github.com/apache/spark/pull/2813

 Improvements to YARN doc
 

 Key: SPARK-3629
 URL: https://issues.apache.org/jira/browse/SPARK-3629
 Project: Spark
  Issue Type: Documentation
  Components: Documentation, YARN
Reporter: Matei Zaharia
  Labels: starter

 Right now this doc starts off with a big list of config options, and only 
 then tells you how to submit an app. It would be better to put that part and 
 the packaging part first, and the config options only at the end.
 In addition, the doc mentions yarn-cluster vs yarn-client as separate 
 masters, which is inconsistent with the help output from spark-submit (which 
 says to always use yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1209) SparkHadoopUtil should not use package org.apache.hadoop

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172479#comment-14172479
 ] 

Apache Spark commented on SPARK-1209:
-

User 'srowen' has created a pull request for this issue:
https://github.com/apache/spark/pull/2814

 SparkHadoopUtil should not use package org.apache.hadoop
 

 Key: SPARK-1209
 URL: https://issues.apache.org/jira/browse/SPARK-1209
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Sandy Ryza
Assignee: Mark Grover

 It's private, so the change won't break compatibility



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2429) Hierarchical Implementation of KMeans

2014-10-15 Thread RJ Nowling (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172486#comment-14172486
 ] 

RJ Nowling commented on SPARK-2429:
---

Great to know! I'm glad that isn't a bottleneck.

Have you been able to benchmark each of the major steps?  Which steps are
most expensive?

On Wed, Oct 15, 2014 at 10:24 AM, Yu Ishikawa (JIRA) j...@apache.org




-- 
em rnowl...@gmail.com
c 954.496.2314


 Hierarchical Implementation of KMeans
 -

 Key: SPARK-2429
 URL: https://issues.apache.org/jira/browse/SPARK-2429
 Project: Spark
  Issue Type: New Feature
  Components: MLlib
Reporter: RJ Nowling
Assignee: Yu Ishikawa
Priority: Minor
 Attachments: The Result of Benchmarking a Hierarchical Clustering.pdf


 Hierarchical clustering algorithms are widely used and would make a nice 
 addition to MLlib.  Clustering algorithms are useful for determining 
 relationships between clusters as well as offering faster assignment. 
 Discussion on the dev list suggested the following possible approaches:
 * Top down, recursive application of KMeans
 * Reuse DecisionTree implementation with different objective function
 * Hierarchical SVD
 It was also suggested that support for distance metrics other than Euclidean 
 such as negative dot or cosine are necessary.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3873) Scala style: check import ordering

2014-10-15 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172552#comment-14172552
 ] 

Marcelo Vanzin commented on SPARK-3873:
---

It doesn't seem like I can unassing bugs...

 Scala style: check import ordering
 --

 Key: SPARK-3873
 URL: https://issues.apache.org/jira/browse/SPARK-3873
 Project: Spark
  Issue Type: Sub-task
  Components: Project Infra
Reporter: Reynold Xin
Assignee: Marcelo Vanzin





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1209) SparkHadoopUtil should not use package org.apache.hadoop

2014-10-15 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172571#comment-14172571
 ] 

Sean Owen commented on SPARK-1209:
--

... and why wouldn't you, that's the title of the JIRA, oops. It's not that 
class that moves or even changes actually, and yes it should not move. Let me 
fix the title and fix my PR too. Maybe that's a more palatable change.

 SparkHadoopUtil should not use package org.apache.hadoop
 

 Key: SPARK-1209
 URL: https://issues.apache.org/jira/browse/SPARK-1209
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Sandy Ryza
Assignee: Mark Grover

 It's private, so the change won't break compatibility



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172582#comment-14172582
 ] 

Josh Rosen commented on SPARK-3948:
---

That kernel bug seems like it could explain why the {{transferTo}} change 
caused problems in sort-based shuffle.  It looks like SPARK-3630 also describes 
some other places where the Kryo PARSING_ERROR(2) has occurred, so I'm going to 
try to figure out which of these other cases might also hit this {{transferTo}} 
code path.

In terms of a bug fix / workaround: maybe we can open the file in append mode, 
since that seems to solve the problem, and also add an internal configuration 
option to disable the transferTo code; this configuration / feature-flag would 
provide an escape hatch for users in case the append fix doesn't work.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172585#comment-14172585
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] great work !
I agree, append might be a workaround to consider (given the semantics of 
getChannel when stream is opened with append).
On other hand, since this piece of code might be used in general context also 
(the copyStreams) - what about logging a warning in case position != 
initialPosition + size at end of the transferTo loop ? Warning users that they 
should upgrade kernal ? (and explicitly modifying position as workaround)



 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Commented] (SPARK-3704) the types not match adding value form spark row to hive row in SparkSQLOperationManager

2014-10-15 Thread JIng Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172645#comment-14172645
 ] 

JIng Wang commented on SPARK-3704:
--

Will this fix be merged to branch-v1.1 for the next 1.1.x release?



 the types not match adding value form spark row to hive row in 
 SparkSQLOperationManager
 ---

 Key: SPARK-3704
 URL: https://issues.apache.org/jira/browse/SPARK-3704
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.1.0
Reporter: wangfei
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3956) Python API for Distributed Matrix

2014-10-15 Thread Davies Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davies Liu updated SPARK-3956:
--
Priority: Minor  (was: Blocker)

 Python API for Distributed Matrix
 -

 Key: SPARK-3956
 URL: https://issues.apache.org/jira/browse/SPARK-3956
 Project: Spark
  Issue Type: New Feature
Reporter: Davies Liu
Assignee: Davies Liu
Priority: Minor

 Python API for distributed matrix



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application

2014-10-15 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172861#comment-14172861
 ] 

Marcelo Vanzin commented on SPARK-3174:
---

[~praveenseluka] I took a quick look at your design. I have a few coments:

* It ignores the shuffle, which is coverend in Andrew's design. You can't scale 
down without figuring out what to do with the shuffle data of completed states, 
since it may be reused. It's not just about cached blocks.

* Overall, I like the idea of having pluggable scaling algorithms, but I 
think it's too early for a public add/remove executor API. For example, 
standalone mode doesn't support adding or removing executors, AFAIK.

* The task runtime-based heuristic looks a bit weird. I think it needs to be 
more dynamic - e.g., actually consider the backlog of tasks vs. number of tasks 
that can be run concurrently on the current executors. Also, doubling number 
of executors seems a bit like a sledgehammer - an exponential approach like 
Andrew has proposed might be able to reach similar results with lower overall 
system load.

I haven't looked at Andrew's PR yet (it's on my todo list), but I don't see a 
lot that is that much different in the two proposals. I agree that if it's 
possible it would be nice to keep the autoscaling code / interfaces as separate 
as possible from the current core, if only to make them easier to replace / 
refactor / disable, but then again, I haven't looked at Andrew's code yet.


 Provide elastic scaling within a Spark application
 --

 Key: SPARK-3174
 URL: https://issues.apache.org/jira/browse/SPARK-3174
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, YARN
Affects Versions: 1.0.2
Reporter: Sandy Ryza
Assignee: Andrew Or
 Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, 
 dynamic-scaling-executors-10-6-14.pdf


 A common complaint with Spark in a multi-tenant environment is that 
 applications have a fixed allocation that doesn't grow and shrink with their 
 resource needs.  We're blocked on YARN-1197 for dynamically changing the 
 resources within executors, but we can still allocate and discard whole 
 executors.
 It would be useful to have some heuristics that
 * Request more executors when many pending tasks are building up
 * Discard executors when they are idle
 See the latest design doc for more information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3936) Incorrect result in GraphX BytecodeUtils with closures + class/object methods

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172867#comment-14172867
 ] 

Apache Spark commented on SPARK-3936:
-

User 'jegonzal' has created a pull request for this issue:
https://github.com/apache/spark/pull/2815

 Incorrect result in GraphX BytecodeUtils with closures + class/object methods
 -

 Key: SPARK-3936
 URL: https://issues.apache.org/jira/browse/SPARK-3936
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Reporter: Pedro Rodriguez
Assignee: Ankur Dave

 There seems to be a bug with the GraphX byte code inspection, specifically in 
 BytecodeUtils. 
 These are the unit tests I wrote to expose the problem:
 https://github.com/EntilZha/spark/blob/a3c38a8329545c034fae2458df134fa3829d08fb/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala#L93-L121
 The first two tests pass, the second two tests fail. This exposes a problem 
 with inspection of methods in closures, in this case within maps. 
 Specifically, it seems like there is a problem with inspection of non-inline 
 methods in a closure.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172895#comment-14172895
 ] 

Josh Rosen commented on SPARK-3948:
---

[~mridulm80], that explicit position-checking and modification idea is a clever 
workaround.

I think that this JIRA explains the cause of the stream-corruption issues that 
we've seen in the sort-based shuffle code, but after some investigation I think 
that it doesn't explain the similar errors that we've seen when deserializing 
tasks that were broadcasted with TorrentBroadcast; I'll keep investigating this.

 Sort-based shuffle can lead to assorted stream-corruption exceptions
 

 Key: SPARK-3948
 URL: https://issues.apache.org/jira/browse/SPARK-3948
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Saisai Shao
Assignee: Saisai Shao

 Several exceptions occurred when running TPC-DS queries against latest master 
 branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
 deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
 these exceptions are gone when we changed to hash-based shuffle.
 With deep investigation, we found that some shuffle output file is 
 unexpectedly smaller than the others, as the log shows:
 {noformat}
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
 shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
 {noformat}
 As you can see the total file length of shuffle_6_11_11 is much smaller than 
 other same stage map output results.
 And we also dump the map outputs in map side to see if this small size output 
 is correct or not, below is the log:
 {noformat}
  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
  274722 262597 291290 272902 264941 270358 291005 295285 252482 
 287142 232617 259871 233734 241439 228897 234282 253834 235619 
 233803 255532 270739 253825 262087 266404 234273 250120 262983 
 257024 255947 254971 258908 247862 221613 258566 245399 251684 
 274843 226150 264278 245279 225656 235084 239466 212851 242245 
 218781 222191 215500 211548 234256 208601 204113 191923 217895 
 227020 215331 212313 223725 250876 256875 239276 266777 235520 
 237462 234063 242270 246825 255888 235937 236956 233099 264508 
 260303 233294 239061 254856 257475 230105 246553 260412 210355 
 211201 219572 206636 226866 209937 226618 218208 206255 248069 
 221717 222112 215734 248088 239207 246125 239056 241133 253091 
 246738 233128 242794 231606 255737 221123 252115 247286 229688 
 251087 250047 237579 263079 256251 238214 208641 201120 204009 
 200825 211965 200600 194492 226471 194887 226975 215072 206008 
 233288 222132 208860 219064 218162 237126 220465 201343 225711 
 232178 233786 212767 211462 213671 215853 227822 233782 214727 
 247001 228968 247413 222674 214241 184122 215643 207665 219079 
 215185 207718 212723 201613 216600 212591 208174 204195 208099 
 229079 230274 223373 214999 256626 228895 231821 383405 229646 
 220212 245495 245960 227556 213266 237203 203805 240509 239306 
 242365 218416 238487 219397 240026 251011 258369 255365 259811 
 283313 248450 264286 264562 257485 279459 249187 257609 274964 
 292369 273826
 {noformat}
 Here I dump the file name, length and each partition's length, obviously the 
 sum of all partition lengths is not equal to file length. So I think there 
 may be a situation paritionWriter in ExternalSorter not always append to the 
 end of previous written file, the file's content is overwritten in some 
 parts, and this lead to the exceptions I mentioned before.
 Also I changed the code of copyStream by disable transferTo, use the previous 
 one, all the issues are gone. So I think there maybe some flushing problems 
 in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: 

[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format

2014-10-15 Thread Fi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172917#comment-14172917
 ] 

Fi commented on SPARK-2883:
---

This sounds great!

Is this a scala only patch, or would it also somehow work in python?

 Spark Support for ORCFile format
 

 Key: SPARK-2883
 URL: https://issues.apache.org/jira/browse/SPARK-2883
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, SQL
Reporter: Zhan Zhang
Priority: Blocker
 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 
 pm jobtracker.png


 Verify the support of OrcInputFormat in spark, fix issues if exists and add 
 documentation of its usage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3630) Identify cause of Kryo+Snappy PARSING_ERROR

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172926#comment-14172926
 ] 

Josh Rosen commented on SPARK-3630:
---

I think that there may be multiple causes of these decompression errors:

*Errors in sort-based shuffle*: SPARK-3948 suggests that this is due to 
stream-corruption that's caused by an interaction between a Linux kernel bug 
and our use of {{transferTo}} for performing zero-copy IO when spilling shuffle 
data.

*Errors when deserializing broadcasted tasks*: this is probably also caused by 
stream-corruption, but the source of that corruption isn't clear yet.  I've 
opened SPARK-3958 to investigate TorrentBroadcast separately.

If you encounter a Kryo or Snappy decompression error that is _not_ covered by 
these two issues, please post a comment in this issue to let me know.

 Identify cause of Kryo+Snappy PARSING_ERROR
 ---

 Key: SPARK-3630
 URL: https://issues.apache.org/jira/browse/SPARK-3630
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Andrew Ash

 A recent GraphX commit caused non-deterministic exceptions in unit tests so 
 it was reverted (see SPARK-3400).
 Separately, [~aash] observed the same exception stacktrace in an 
 application-specific Kryo registrator:
 {noformat}
 com.esotericsoftware.kryo.KryoException: java.io.IOException: failed to 
 uncompress the chunk: PARSING_ERROR(2)
 com.esotericsoftware.kryo.io.Input.fill(Input.java:142) 
 com.esotericsoftware.kryo.io.Input.require(Input.java:169) 
 com.esotericsoftware.kryo.io.Input.readInt(Input.java:325) 
 com.esotericsoftware.kryo.io.Input.readFloat(Input.java:624) 
 com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:127)
  
 com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:117)
  
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) 
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
  
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
  
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
 ...
 {noformat}
 This ticket is to identify the cause of the exception in the GraphX commit so 
 the faulty commit can be fixed and merged back into master.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3630) Identify cause of Kryo+Snappy PARSING_ERROR

2014-10-15 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-3630:
--
 Target Version/s: 1.1.1, 1.2.0
Affects Version/s: 1.2.0
 Assignee: Josh Rosen

 Identify cause of Kryo+Snappy PARSING_ERROR
 ---

 Key: SPARK-3630
 URL: https://issues.apache.org/jira/browse/SPARK-3630
 Project: Spark
  Issue Type: Task
  Components: Spark Core
Affects Versions: 1.1.0, 1.2.0
Reporter: Andrew Ash
Assignee: Josh Rosen

 A recent GraphX commit caused non-deterministic exceptions in unit tests so 
 it was reverted (see SPARK-3400).
 Separately, [~aash] observed the same exception stacktrace in an 
 application-specific Kryo registrator:
 {noformat}
 com.esotericsoftware.kryo.KryoException: java.io.IOException: failed to 
 uncompress the chunk: PARSING_ERROR(2)
 com.esotericsoftware.kryo.io.Input.fill(Input.java:142) 
 com.esotericsoftware.kryo.io.Input.require(Input.java:169) 
 com.esotericsoftware.kryo.io.Input.readInt(Input.java:325) 
 com.esotericsoftware.kryo.io.Input.readFloat(Input.java:624) 
 com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:127)
  
 com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:117)
  
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) 
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
  
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
  
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
 ...
 {noformat}
 This ticket is to identify the cause of the exception in the GraphX commit so 
 the faulty commit can be fixed and merged back into master.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3959) SqlParser fails to parse literal -9223372036854775808 (Long.MinValue).

2014-10-15 Thread Kousuke Saruta (JIRA)
Kousuke Saruta created SPARK-3959:
-

 Summary: SqlParser fails to parse literal -9223372036854775808 
(Long.MinValue).
 Key: SPARK-3959
 URL: https://issues.apache.org/jira/browse/SPARK-3959
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Kousuke Saruta
Priority: Critical


SqlParser fails to parse -9223372036854775808 (Long.MinValue) so we cannot 
write queries such like as follows.

{code}
SELECT value FROM someTable WHERE value  -9223372036854775808
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3960) We cannot apply unary minus only to literal.

2014-10-15 Thread Kousuke Saruta (JIRA)
Kousuke Saruta created SPARK-3960:
-

 Summary: We cannot apply unary minus only to literal.
 Key: SPARK-3960
 URL: https://issues.apache.org/jira/browse/SPARK-3960
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Kousuke Saruta
Priority: Critical


Because of the wrong syntax definition, we cannot apply unary minus only to 
literal. So, we cannot write such expressions.

{code}
-(value1 + value2)
-column



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3960) We cannot apply unary minus only to literal.

2014-10-15 Thread Kousuke Saruta (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kousuke Saruta updated SPARK-3960:
--
Description: 
Because of the wrong syntax definition, we cannot apply unary minus only to 
literal. So, we cannot write such expressions.

{code}
-(value1 + value2) // Parenthesized expressions
-column // Columns
-MAX(column) // Functions
{code}

  was:
Because of the wrong syntax definition, we cannot apply unary minus only to 
literal. So, we cannot write such expressions.

{code}
-(value1 + value2)
-column


 We cannot apply unary minus only to literal.
 

 Key: SPARK-3960
 URL: https://issues.apache.org/jira/browse/SPARK-3960
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Kousuke Saruta
Priority: Critical

 Because of the wrong syntax definition, we cannot apply unary minus only to 
 literal. So, we cannot write such expressions.
 {code}
 -(value1 + value2) // Parenthesized expressions
 -column // Columns
 -MAX(column) // Functions
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3960) We can apply unary minus only to literal.

2014-10-15 Thread Kousuke Saruta (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kousuke Saruta updated SPARK-3960:
--
Summary: We can apply unary minus only to literal.  (was: We cannot apply 
unary minus only to literal.)

 We can apply unary minus only to literal.
 -

 Key: SPARK-3960
 URL: https://issues.apache.org/jira/browse/SPARK-3960
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Kousuke Saruta
Priority: Critical

 Because of the wrong syntax definition, we cannot apply unary minus only to 
 literal. So, we cannot write such expressions.
 {code}
 -(value1 + value2) // Parenthesized expressions
 -column // Columns
 -MAX(column) // Functions
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172949#comment-14172949
 ] 

Josh Rosen commented on SPARK-3958:
---

Digging into this stacktrace in more detail:

Snappy-java's {{SnappyOutputStream}} writes its own 8-byte header at the 
beginning of the serialized output.  This header consists of an 8-byte magic 
value followed by two 4-byte version numbers.  This header is distinct from 
Snappy's own 6-byte magic number / header.

{{org.xerial.snappy.SnappyInputStream.readHeader}} is implemented like this (in 
Snappy-Java 1.1.1.3):

{code}
protected void readHeader() throws IOException {
byte[] header = new byte[SnappyCodec.headerSize()];
int readBytes = 0;
while (readBytes  header.length) {
int ret = in.read(header, readBytes, header.length - readBytes);
if (ret == -1)
break;
readBytes += ret;
}

// Quick test of the header 
if (readBytes  header.length || header[0] != 
SnappyCodec.MAGIC_HEADER[0]) {
// do the default uncompression
readFully(header, readBytes);
return;
}

SnappyCodec codec = SnappyCodec.readHeader(new 
ByteArrayInputStream(header));
if (codec.isValidMagicHeader()) {
// The input data is compressed by SnappyOutputStream
if (codec.version  SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
throw new IOException(String.format(
compressed with imcompatible codec version %d. At 
least version %d is required,
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
}
}
else {
// (probably) compressed by Snappy.compress(byte[])
readFully(header, readBytes);
return;
}
}
{code}

It starts by attempting to read the 8-byte header.  The first {{while}} loop 
exits when we've either read 8 bytes of header data or if the input stream was 
closed before it could read a complete header.  The following code checks 
whether the header is unexpectedly short or whether it doesn't match the 
snappy-java magic header.  In our case, we end up taking this branch and 
calling {{readFully(header, readBytes)}} in order to perform the default Snappy 
decompression  This is the wrong branch to take (since our data was compressed 
with a SnappyOutputStream), leading to the PARSING_ERROR.

Based on this, I think that the input data to the SnappyInputStream is somehow 
being corrupted.  It's not obvious whether this corruption is causing the input 
data to be too short or whether the start of the stream has the wrong contents. 
 I'll keep digging and look into adding some size-checking assertions 
throughout our code.

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0, 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at 

[jira] [Commented] (SPARK-3960) We can apply unary minus only to literal.

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172986#comment-14172986
 ] 

Apache Spark commented on SPARK-3960:
-

User 'sarutak' has created a pull request for this issue:
https://github.com/apache/spark/pull/2816

 We can apply unary minus only to literal.
 -

 Key: SPARK-3960
 URL: https://issues.apache.org/jira/browse/SPARK-3960
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Kousuke Saruta
Priority: Critical

 Because of the wrong syntax definition, we cannot apply unary minus only to 
 literal. So, we cannot write such expressions.
 {code}
 -(value1 + value2) // Parenthesized expressions
 -column // Columns
 -MAX(column) // Functions
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3937) Unsafe memory access inside of Snappy library

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173025#comment-14173025
 ] 

Josh Rosen commented on SPARK-3937:
---

Another occurrence of this problem, running a recent-ish version of master 
(1.2):

{code}
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:384)
at 
java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2293)
at 
java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2586)
at 
java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2596)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1318)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{code}

 Unsafe memory access inside of Snappy library
 -

 Key: SPARK-3937
 URL: https://issues.apache.org/jira/browse/SPARK-3937
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Patrick Wendell

 This was observed on master between Spark 1.1 and 1.2. Unfortunately I don't 
 have much information about this other than the stack trace. However, it was 
 concerning enough I figured I should post it.
 {code}
 java.lang.InternalError: a fault occurred in a recent unsafe memory access 
 operation in compiled Java code
 org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
 org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
 org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
 
 org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:355)
 
 org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
 org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
 
 java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
 
 java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2712)
 
 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742)
 java.io.ObjectInputStream.readArray(ObjectInputStream.java:1687)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
 

[jira] [Updated] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-3958:
--
Target Version/s: 1.2.0  (was: 1.1.1, 1.2.0)

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 {code}
 SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo 
 and Snappy deserialization errors.  This ticket is for a more 
 narrowly-focused exploration of the TorrentBroadcast version of these errors, 
 since the similar errors that we've seen in sort-based shuffle seem to be 
 explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-3958:
--
Affects Version/s: (was: 1.1.0)

Removing 1.1.0 as an affected version for now, since the stacktrace that I 
posted here was from a recent build of master (1.2).  If anyone can reproduce 
this in branch-1.1 or Spark 1.1.0, please let me know.

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 {code}
 SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo 
 and Snappy deserialization errors.  This ticket is for a more 
 narrowly-focused exploration of the TorrentBroadcast version of these errors, 
 since the similar errors that we've seen in sort-based shuffle seem to be 
 explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173154#comment-14173154
 ] 

Josh Rosen commented on SPARK-3958:
---

I think that I can safely rule out problems in TorrentBroadcast's 
(de-)blockification code: I used ScalaCheck to write some tests to ensure that 
blockifyObject and unblockifyObject are inverses, plus a similar test for 
ByteArrayChunkOutputStream: 
https://github.com/JoshRosen/spark/commit/413be7f6a8d4eb14c69c7db87e2564ed4d776c42?diff=unified

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 {code}
 SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo 
 and Snappy deserialization errors.  This ticket is for a more 
 narrowly-focused exploration of the TorrentBroadcast version of these errors, 
 since the similar errors that we've seen in sort-based shuffle seem to be 
 explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173166#comment-14173166
 ] 

Apache Spark commented on SPARK-3955:
-

User 'jongyoul' has created a pull request for this issue:
https://github.com/apache/spark/pull/2818

 Different versions between jackson-mapper-asl and jackson-core-asl
 --

 Key: SPARK-3955
 URL: https://issues.apache.org/jira/browse/SPARK-3955
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 1.1.0
Reporter: Jongyoul Lee

 In the parent pom.xml, specified a version of jackson-mapper-asl. This is 
 used by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl 
 is not same as jackson-core-asl. This is because other libraries use several 
 versions of jackson, so other version of jackson-core-asl is assembled. 
 Simply, fix this problem if pom.xml has a specific version information of 
 jackson-core-asl. If it's not set, a version 1.9.11 is merged info 
 assembly.jar and we cannot use jackson library properly.
 {code}
 [INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the 
 shaded jar.
 [INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the 
 shaded jar.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3961) Python API for mllib.feature

2014-10-15 Thread Davies Liu (JIRA)
Davies Liu created SPARK-3961:
-

 Summary: Python API for mllib.feature
 Key: SPARK-3961
 URL: https://issues.apache.org/jira/browse/SPARK-3961
 Project: Spark
  Issue Type: New Feature
Reporter: Davies Liu


Add completed API for mllib.feature



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3961) Python API for mllib.feature

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173172#comment-14173172
 ] 

Apache Spark commented on SPARK-3961:
-

User 'davies' has created a pull request for this issue:
https://github.com/apache/spark/pull/2819

 Python API for mllib.feature
 

 Key: SPARK-3961
 URL: https://issues.apache.org/jira/browse/SPARK-3961
 Project: Spark
  Issue Type: New Feature
Reporter: Davies Liu
Assignee: Davies Liu

 Add completed API for mllib.feature



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173199#comment-14173199
 ] 

Saisai Shao commented on SPARK-3958:


Hi Josh, have you tried other compression like LZO to narrow down the problem?

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 {code}
 SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo 
 and Snappy deserialization errors.  This ticket is for a more 
 narrowly-focused exploration of the TorrentBroadcast version of these errors, 
 since the similar errors that we've seen in sort-based shuffle seem to be 
 explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2014-10-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173207#comment-14173207
 ] 

Josh Rosen commented on SPARK-3958:
---

Hi [~jerryshao],

I don't have a reliable reproduction for this issue yet, so I haven't tried 
switching compression schemes or broadcast implementations.  I'm working with 
the user who provided this stack trace to see if we can get more logs to 
provide additional context.

 Possible stream-corruption issues in TorrentBroadcast
 -

 Key: SPARK-3958
 URL: https://issues.apache.org/jira/browse/SPARK-3958
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen

 TorrentBroadcast deserialization sometimes fails with decompression errors, 
 which are most likely caused by stream-corruption exceptions.  For example, 
 this can manifest itself as a Snappy PARSING_ERROR when deserializing a 
 broadcasted task:
 {code}
 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered 
 locally
 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast 
 variable 8
 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 
 took 5.3433E-5 s
 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 
 18)
 java.io.IOException: PARSING_ERROR(2)
   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
   at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
   at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
   at 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
   at 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
   at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
   at 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
   at 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
   at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 {code}
 SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo 
 and Snappy deserialization errors.  This ticket is for a more 
 narrowly-focused exploration of the TorrentBroadcast version of these errors, 
 since the similar errors that we've seen in sort-based shuffle seem to be 
 explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3720) support ORC in spark sql

2014-10-15 Thread Zhan Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhan Zhang updated SPARK-3720:
--
Attachment: orc.diff

This is the diff for orc file support. Because I also work on this item in 
parallel for spark-2883. Since there is PR already opened for this jira, I 
attach my diff here, and will work with wangfei to consolidate our work and 
work together to this feature. Following is the detail description.

1. Basic Operator: saveAsOrcFile and OrcFile. The former is used to save the 
table into orc format file, and the latter is used to import orc format file 
into spark sql table.
2. Column pruning
3. Self-contained schema support: The orc support is fully functional 
independent of hive metastore. The table schema is maintained by the orc file 
itself.
4. To support the orc file, user need to: import import 
org.apache.spark.sql.hive.orc._ to bring in the orc support into context
5. The orc file is operated in HiveContext, the only reason is due to package 
issue, and we don’t want to bring in hive dependency into spark sql. Note that 
orc operations does not relies on Hive metastore.
6. It support full complicated dataType in Spark Sql, for example, list, seq, 
and nested datatype.
7. Because the feature is supported in HiveContext, so the sql parser is 
actually using hive parser.

Hive 0.13.1 support.
With minor change, after spark hive upgraded to 0.13.1
1. the orc can support different compression method, e.g., SNAPPY, LZO, ZLIB, 
and NONE
2. prediction pushdown
Following is the example to use orc file, which is almost identical to the 
parquet format support from user perspective.
import org.apache.spark.sql.hive.orc._
val ctx = new org.apache.spark.sql.hive.HiveContext(sc)
val people = sc.textFile(examples/src/main/resources/people.txt)
val schemaString = name age
import org.apache.spark.sql._
val schema = StructType(schemaString.split( ).map(fieldName = 
StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(,)).map(p = Row(p(0), p(1).trim))
val peopleSchemaRDD = ctx.applySchema(rowRDD, schema)
peopleSchemaRDD.registerTempTable(people)
val results = ctx.sql(SELECT name FROM people)
results.map(t = Name:  + t(0)).collect().foreach(println)
peopleSchemaRDD.saveAsOrcFile(people.orc)
val orcFile = ctx.orcFile(people.orc)
orcFile.registerTempTable(orcFile)
val teenagers = ctx.sql(SELECT name FROM orcFile WHERE age = 13 AND age = 
19)
teenagers.map(t = Name:  + t(0)).collect().foreach(println)

 support ORC in spark sql
 

 Key: SPARK-3720
 URL: https://issues.apache.org/jira/browse/SPARK-3720
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 1.1.0
Reporter: wangfei
 Attachments: orc.diff


 The Optimized Row Columnar (ORC) file format provides a highly efficient way 
 to store data on hdfs.ORC file format has many advantages such as:
 1 a single file as the output of each task, which reduces the NameNode's load
 2 Hive type support including datetime, decimal, and the complex types 
 (struct, list, map, and union)
 3 light-weight indexes stored within the file
 skip row groups that don't pass predicate filtering
 seek to a given row
 4 block-mode compression based on data type
 run-length encoding for integer columns
 dictionary encoding for string columns
 5 concurrent reads of the same file using separate RecordReaders
 6 ability to split files without scanning for markers
 7 bound the amount of memory needed for reading or writing
 8 metadata stored using Protocol Buffers, which allows addition and removal 
 of fields
 Now spark sql support Parquet, support ORC provide people more opts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2883) Spark Support for ORCFile format

2014-10-15 Thread Zhan Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhan Zhang updated SPARK-2883:
--
Attachment: orc.diff

Just for completion, I also attach the diff here besides spark-3720. 

 Spark Support for ORCFile format
 

 Key: SPARK-2883
 URL: https://issues.apache.org/jira/browse/SPARK-2883
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, SQL
Reporter: Zhan Zhang
Priority: Blocker
 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 
 pm jobtracker.png, orc.diff


 Verify the support of OrcInputFormat in spark, fix issues if exists and add 
 documentation of its usage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format

2014-10-15 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173269#comment-14173269
 ] 

Zhan Zhang commented on SPARK-2883:
---

It is scala patch, and I am not very familiar with python support. If I get a 
chance, will take a look, but currently I may still focus on scala support 
itself. If you have chance, please give it a try and provide your feedback. 
This patch is not exhaustively tested, but I did test basic functionalities. 
For collaboration reason, I upload the patch earlier, so wangfei working for 
spark-3720 and me can consolidate our work and collaborate to fully support orc 
format.

 Spark Support for ORCFile format
 

 Key: SPARK-2883
 URL: https://issues.apache.org/jira/browse/SPARK-2883
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, SQL
Reporter: Zhan Zhang
Priority: Blocker
 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 
 pm jobtracker.png, orc.diff


 Verify the support of OrcInputFormat in spark, fix issues if exists and add 
 documentation of its usage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1209) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop

2014-10-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173284#comment-14173284
 ] 

Patrick Wendell commented on SPARK-1209:


Got it! Yeah so anything that is internal, of course would be great to move it 
to the Spark namespace if it's not necessary to be in Hadoop.

 SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
 --

 Key: SPARK-1209
 URL: https://issues.apache.org/jira/browse/SPARK-1209
 Project: Spark
  Issue Type: Bug
Affects Versions: 0.9.0
Reporter: Sandy Ryza
Assignee: Mark Grover

 It's private, so the change won't break compatibility



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3954) Optimization to FileInputDStream

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-3954:
---
Summary: Optimization to FileInputDStream  (was:  promote the speed of 
convert files to RDDS)

 Optimization to FileInputDStream
 

 Key: SPARK-3954
 URL: https://issues.apache.org/jira/browse/SPARK-3954
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0, 1.1.0
Reporter: 宿荣全

 about convert files to RDDS there are 3 loops with files sequence in spark 
 source.
 loops files sequence:
 1、files.map(...)
 2、files.zip(fileRDDs)
 3、files-size.foreach
 It's will very time consuming when lots of files.So I do the following 
 correction:
 3 loops with files sequence = only one loop
 spark source code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, 
 V, F](file))
 files.zip(fileRDDs).foreach { case (file, rdd) = {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
 }}
 new UnionRDD(context.sparkContext, fileRDDs)
   }
 // 
 ---
 modified code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = for (file - files; rdd = 
 context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
   rdd
 }
 new UnionRDD(context.sparkContext, fileRDDs)
   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3954) promote the speed of convert files to RDDS

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-3954:
---
Component/s: (was: Input/Output)
 Streaming

  promote the speed of convert files to RDDS
 ---

 Key: SPARK-3954
 URL: https://issues.apache.org/jira/browse/SPARK-3954
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0, 1.1.0
Reporter: 宿荣全

 about convert files to RDDS there are 3 loops with files sequence in spark 
 source.
 loops files sequence:
 1、files.map(...)
 2、files.zip(fileRDDs)
 3、files-size.foreach
 It's will very time consuming when lots of files.So I do the following 
 correction:
 3 loops with files sequence = only one loop
 spark source code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, 
 V, F](file))
 files.zip(fileRDDs).foreach { case (file, rdd) = {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
 }}
 new UnionRDD(context.sparkContext, fileRDDs)
   }
 // 
 ---
 modified code:
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = for (file - files; rdd = 
 context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
   if (rdd.partitions.size == 0) {
 logError(File  + file +  has no data in it. Spark Streaming can 
 only ingest  +
   files that have been \moved\ to the directory assigned to the 
 file stream.  +
   Refer to the streaming programming guide for more details.)
   }
   rdd
 }
 new UnionRDD(context.sparkContext, fileRDDs)
   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3951) Make the external-* jars fat jars

2014-10-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173289#comment-14173289
 ] 

Patrick Wendell commented on SPARK-3951:


I think the best solution here is actually to mark spark as provided in these 
external library poms. That way users can link against them and build their own 
assemblies that won't include Spark itself. This is consistent with the way 
that users in general build Spark applications.

 Make the external-* jars fat jars
 -

 Key: SPARK-3951
 URL: https://issues.apache.org/jira/browse/SPARK-3951
 Project: Spark
  Issue Type: Bug
Reporter: Hari Shreedharan

 I am not sure this is the right solution to the issue of having to pull in 
 the external jars and all of the dependencies and transitive dependencies, 
 but building them as fat jars will definitely solve this, though not in a not 
 so elegant way.
 If we build the fat jar, the user can simply deploy this jar and not  worry 
 about all of the dependencies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3962) Mark spark dependency as provided in external libraries

2014-10-15 Thread Patrick Wendell (JIRA)
Patrick Wendell created SPARK-3962:
--

 Summary: Mark spark dependency as provided in external libraries
 Key: SPARK-3962
 URL: https://issues.apache.org/jira/browse/SPARK-3962
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Patrick Wendell
Priority: Blocker


Right now there is not an easy way for users to link against the external 
streaming libraries and not accidentally pull Spark into their assembly jar. We 
should mark Spark as provided in the external connector pom's so that user 
applications can simply include those like any other dependency in the user's 
jar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3962) Mark spark dependency as provided in external libraries

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-3962:
---
Description: 
Right now there is not an easy way for users to link against the external 
streaming libraries and not accidentally pull Spark into their assembly jar. We 
should mark Spark as provided in the external connector pom's so that user 
applications can simply include those like any other dependency in the user's 
jar.

This is also the best format for third-party libraries that depend on Spark (of 
which there will eventually be many) so it would be nice for our own build to 
conform to this nicely.

  was:Right now there is not an easy way for users to link against the external 
streaming libraries and not accidentally pull Spark into their assembly jar. We 
should mark Spark as provided in the external connector pom's so that user 
applications can simply include those like any other dependency in the user's 
jar.


 Mark spark dependency as provided in external libraries
 -

 Key: SPARK-3962
 URL: https://issues.apache.org/jira/browse/SPARK-3962
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Patrick Wendell
Priority: Blocker

 Right now there is not an easy way for users to link against the external 
 streaming libraries and not accidentally pull Spark into their assembly jar. 
 We should mark Spark as provided in the external connector pom's so that 
 user applications can simply include those like any other dependency in the 
 user's jar.
 This is also the best format for third-party libraries that depend on Spark 
 (of which there will eventually be many) so it would be nice for our own 
 build to conform to this nicely.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-1561) sbt/sbt assembly generates too many local files

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell resolved SPARK-1561.

Resolution: Not a Problem

Does not seem to be an issue any more - resolving per comment from Sean.

 sbt/sbt assembly generates too many local files
 ---

 Key: SPARK-1561
 URL: https://issues.apache.org/jira/browse/SPARK-1561
 Project: Spark
  Issue Type: Improvement
Affects Versions: 1.0.0
Reporter: Xiangrui Meng

 Running `find ./ | wc -l` after `sbt/sbt assembly` returned 
 564365
 This hits the default limit of #INode of an 8GB EXT FS (the default volume 
 size for an EC2 instance), which means you can do nothing after 'sbt/sbt 
 assembly` on such a partition.
 Most of the small files are under assembly/target/streams and the same folder 
 under examples/.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3431) Parallelize execution of tests

2014-10-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173300#comment-14173300
 ] 

Patrick Wendell commented on SPARK-3431:


[~srowen] - just wondering, is it trivial to parallelize the tests in maven at 
the granularity of test suites?

 Parallelize execution of tests
 --

 Key: SPARK-3431
 URL: https://issues.apache.org/jira/browse/SPARK-3431
 Project: Spark
  Issue Type: Improvement
  Components: Build
Reporter: Nicholas Chammas

 Running all the tests in {{dev/run-tests}} takes up to 2 hours. A common 
 strategy to cut test time down is to parallelize the execution of the tests. 
 Doing that may in turn require some prerequisite changes to be made to how 
 certain tests run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3963) Support getting task-scoped properties from TaskContext

2014-10-15 Thread Patrick Wendell (JIRA)
Patrick Wendell created SPARK-3963:
--

 Summary: Support getting task-scoped properties from TaskContext
 Key: SPARK-3963
 URL: https://issues.apache.org/jira/browse/SPARK-3963
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Patrick Wendell


This is a proposal for a minor feature. Given stabilization of the TaskContext 
API, it would be nice to have a mechanism for Spark jobs to access properties 
that are defined based on task-level scope by Spark RDD's. I'd like to propose 
adding a simple properties hash map with some standard spark properties that 
users can access. Later it would be nice to support users setting these 
properties, but for now to keep it simple in 1.2. I'd prefer users not be able 
to set them.

The main use case is providing the file name from Hadoop RDD's, a very common 
request. But I'd imagine us using this for other things later on. We could also 
use this to expose some of the taskMetrics, such as e.g. the input bytes.

{code}
val data = sc.textFile(s3n//..2014/*/*/*.json)
data.mapPartitions { 
  val fileName = TaskContext.get.getProperty(TaskContext.HADOOP_FILE_NAME)
  val parts = fileName.split(/)
  val (year, month, day) = (parts[3], parts[4], parts[5])
}
{code}

Internally we'd have a method called setProperty, but this wouldn't be exposed 
initially.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3963) Support getting task-scoped properties from TaskContext

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-3963:
---
Description: 
This is a proposal for a minor feature. Given stabilization of the TaskContext 
API, it would be nice to have a mechanism for Spark jobs to access properties 
that are defined based on task-level scope by Spark RDD's. I'd like to propose 
adding a simple properties hash map with some standard spark properties that 
users can access. Later it would be nice to support users setting these 
properties, but for now to keep it simple in 1.2. I'd prefer users not be able 
to set them.

The main use case is providing the file name from Hadoop RDD's, a very common 
request. But I'd imagine us using this for other things later on. We could also 
use this to expose some of the taskMetrics, such as e.g. the input bytes.

{code}
val data = sc.textFile(s3n//..2014/*/*/*.json)
data.mapPartitions { 
  val tc = TaskContext.get
  val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
  val parts = fileName.split(/)
  val (year, month, day) = (parts[3], parts[4], parts[5])
  ...
}
{code}

Internally we'd have a method called setProperty, but this wouldn't be exposed 
initially.

  was:
This is a proposal for a minor feature. Given stabilization of the TaskContext 
API, it would be nice to have a mechanism for Spark jobs to access properties 
that are defined based on task-level scope by Spark RDD's. I'd like to propose 
adding a simple properties hash map with some standard spark properties that 
users can access. Later it would be nice to support users setting these 
properties, but for now to keep it simple in 1.2. I'd prefer users not be able 
to set them.

The main use case is providing the file name from Hadoop RDD's, a very common 
request. But I'd imagine us using this for other things later on. We could also 
use this to expose some of the taskMetrics, such as e.g. the input bytes.

{code}
val data = sc.textFile(s3n//..2014/*/*/*.json)
data.mapPartitions { 
  val fileName = TaskContext.get.getProperty(TaskContext.HADOOP_FILE_NAME)
  val parts = fileName.split(/)
  val (year, month, day) = (parts[3], parts[4], parts[5])
  ...
}
{code}

Internally we'd have a method called setProperty, but this wouldn't be exposed 
initially.


 Support getting task-scoped properties from TaskContext
 ---

 Key: SPARK-3963
 URL: https://issues.apache.org/jira/browse/SPARK-3963
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Patrick Wendell

 This is a proposal for a minor feature. Given stabilization of the 
 TaskContext API, it would be nice to have a mechanism for Spark jobs to 
 access properties that are defined based on task-level scope by Spark RDD's. 
 I'd like to propose adding a simple properties hash map with some standard 
 spark properties that users can access. Later it would be nice to support 
 users setting these properties, but for now to keep it simple in 1.2. I'd 
 prefer users not be able to set them.
 The main use case is providing the file name from Hadoop RDD's, a very common 
 request. But I'd imagine us using this for other things later on. We could 
 also use this to expose some of the taskMetrics, such as e.g. the input bytes.
 {code}
 val data = sc.textFile(s3n//..2014/*/*/*.json)
 data.mapPartitions { 
   val tc = TaskContext.get
   val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
   val parts = fileName.split(/)
   val (year, month, day) = (parts[3], parts[4], parts[5])
   ...
 }
 {code}
 Internally we'd have a method called setProperty, but this wouldn't be 
 exposed initially.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3964) Python API for Hypothesis testing

2014-10-15 Thread Davies Liu (JIRA)
Davies Liu created SPARK-3964:
-

 Summary: Python API for Hypothesis testing 
 Key: SPARK-3964
 URL: https://issues.apache.org/jira/browse/SPARK-3964
 Project: Spark
  Issue Type: New Feature
  Components: MLlib
Reporter: Davies Liu


Python API for Hypothesis testing 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3965) Spark assembly for hadoop2 contains avro-mapred for hadoop1

2014-10-15 Thread David Jacot (JIRA)
David Jacot created SPARK-3965:
--

 Summary: Spark assembly for hadoop2 contains avro-mapred for 
hadoop1
 Key: SPARK-3965
 URL: https://issues.apache.org/jira/browse/SPARK-3965
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 1.1.0, 1.0.2
 Environment: hadoop2, HDP2.1
Reporter: David Jacot


When building Spark assembly for hadoop2, org.apache.avro:avro-mapred for 
hadoop1 is picked and added to the assembly which leads to following exception 
at runtime.

{code}
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at 
org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:111)
...
{code}

The patch for SPARK-3039 works well at compile time but artefact's classifier 
is not applied when assembly is built. I'm not a maven expert but I don't think 
that classifiers are applied on transitive dependencies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3966) Fix nullabilities of Cast related to DateType.

2014-10-15 Thread Takuya Ueshin (JIRA)
Takuya Ueshin created SPARK-3966:


 Summary: Fix nullabilities of Cast related to DateType.
 Key: SPARK-3966
 URL: https://issues.apache.org/jira/browse/SPARK-3966
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: Takuya Ueshin






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3966) Fix nullabilities of Cast related to DateType.

2014-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173400#comment-14173400
 ] 

Apache Spark commented on SPARK-3966:
-

User 'ueshin' has created a pull request for this issue:
https://github.com/apache/spark/pull/2820

 Fix nullabilities of Cast related to DateType.
 --

 Key: SPARK-3966
 URL: https://issues.apache.org/jira/browse/SPARK-3966
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: Takuya Ueshin





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3963) Support getting task-scoped properties from TaskContext

2014-10-15 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173404#comment-14173404
 ] 

Patrick Wendell commented on SPARK-3963:


[~rxin] and [~adav] I'd be interested in any feedback on this.

 Support getting task-scoped properties from TaskContext
 ---

 Key: SPARK-3963
 URL: https://issues.apache.org/jira/browse/SPARK-3963
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Patrick Wendell

 This is a proposal for a minor feature. Given stabilization of the 
 TaskContext API, it would be nice to have a mechanism for Spark jobs to 
 access properties that are defined based on task-level scope by Spark RDD's. 
 I'd like to propose adding a simple properties hash map with some standard 
 spark properties that users can access. Later it would be nice to support 
 users setting these properties, but for now to keep it simple in 1.2. I'd 
 prefer users not be able to set them.
 The main use case is providing the file name from Hadoop RDD's, a very common 
 request. But I'd imagine us using this for other things later on. We could 
 also use this to expose some of the taskMetrics, such as e.g. the input bytes.
 {code}
 val data = sc.textFile(s3n//..2014/*/*/*.json)
 data.mapPartitions { 
   val tc = TaskContext.get
   val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
   val parts = fileName.split(/)
   val (year, month, day) = (parts[3], parts[4], parts[5])
   ...
 }
 {code}
 Internally we'd have a method called setProperty, but this wouldn't be 
 exposed initially. This is structured as a simple (String, String) hash map 
 for ease of porting to python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3963) Support getting task-scoped properties from TaskContext

2014-10-15 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-3963:
---
Description: 
This is a proposal for a minor feature. Given stabilization of the TaskContext 
API, it would be nice to have a mechanism for Spark jobs to access properties 
that are defined based on task-level scope by Spark RDD's. I'd like to propose 
adding a simple properties hash map with some standard spark properties that 
users can access. Later it would be nice to support users setting these 
properties, but for now to keep it simple in 1.2. I'd prefer users not be able 
to set them.

The main use case is providing the file name from Hadoop RDD's, a very common 
request. But I'd imagine us using this for other things later on. We could also 
use this to expose some of the taskMetrics, such as e.g. the input bytes.

{code}
val data = sc.textFile(s3n//..2014/*/*/*.json)
data.mapPartitions { 
  val tc = TaskContext.get
  val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
  val parts = fileName.split(/)
  val (year, month, day) = (parts[3], parts[4], parts[5])
  ...
}
{code}

Internally we'd have a method called setProperty, but this wouldn't be exposed 
initially. This is structured as a simple (String, String) hash map for ease of 
porting to python.

  was:
This is a proposal for a minor feature. Given stabilization of the TaskContext 
API, it would be nice to have a mechanism for Spark jobs to access properties 
that are defined based on task-level scope by Spark RDD's. I'd like to propose 
adding a simple properties hash map with some standard spark properties that 
users can access. Later it would be nice to support users setting these 
properties, but for now to keep it simple in 1.2. I'd prefer users not be able 
to set them.

The main use case is providing the file name from Hadoop RDD's, a very common 
request. But I'd imagine us using this for other things later on. We could also 
use this to expose some of the taskMetrics, such as e.g. the input bytes.

{code}
val data = sc.textFile(s3n//..2014/*/*/*.json)
data.mapPartitions { 
  val tc = TaskContext.get
  val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
  val parts = fileName.split(/)
  val (year, month, day) = (parts[3], parts[4], parts[5])
  ...
}
{code}

Internally we'd have a method called setProperty, but this wouldn't be exposed 
initially.


 Support getting task-scoped properties from TaskContext
 ---

 Key: SPARK-3963
 URL: https://issues.apache.org/jira/browse/SPARK-3963
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Patrick Wendell

 This is a proposal for a minor feature. Given stabilization of the 
 TaskContext API, it would be nice to have a mechanism for Spark jobs to 
 access properties that are defined based on task-level scope by Spark RDD's. 
 I'd like to propose adding a simple properties hash map with some standard 
 spark properties that users can access. Later it would be nice to support 
 users setting these properties, but for now to keep it simple in 1.2. I'd 
 prefer users not be able to set them.
 The main use case is providing the file name from Hadoop RDD's, a very common 
 request. But I'd imagine us using this for other things later on. We could 
 also use this to expose some of the taskMetrics, such as e.g. the input bytes.
 {code}
 val data = sc.textFile(s3n//..2014/*/*/*.json)
 data.mapPartitions { 
   val tc = TaskContext.get
   val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME)
   val parts = fileName.split(/)
   val (year, month, day) = (parts[3], parts[4], parts[5])
   ...
 }
 {code}
 Internally we'd have a method called setProperty, but this wouldn't be 
 exposed initially. This is structured as a simple (String, String) hash map 
 for ease of porting to python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3885) Provide mechanism to remove accumulators once they are no longer used

2014-10-15 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173407#comment-14173407
 ] 

Nathan Kronenfeld commented on SPARK-3885:
--

I tried reusing accumulators and clearing them, but I'm still running out of 
memory.

According to the profiler, there is a lot still held by 
Accumulators$.localAccums.

If I read things right, this is where the workers hold on to their versions of 
the accumulator?

It looks like there is a clear call, but it looks to me like it's only run at 
the beginning of a task (which means it should be keeping these around at the 
end of a job, until another task is run on that thread - which makes it a bit 
tough to profile.

Am I reading all this correctly? If so, I can see why it isn't cleared out at 
the end of a job - there's probably no way of doing that safely, since the 
workers don't know when the end is.

Ideally, I'd love a call whereby I can explicitly release an accumulator.  It 
seems to me that would require a parallel map in Accumulators$ that kept track 
of the threads on which each accumulator was stored, so it could clear them.

Am I understanding this all correctly?  If so, I think I could put together the 
fix I describe pretty easily.

 Provide mechanism to remove accumulators once they are no longer used
 -

 Key: SPARK-3885
 URL: https://issues.apache.org/jira/browse/SPARK-3885
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: Josh Rosen

 Spark does not currently provide any mechanism to delete accumulators after 
 they are no longer used.  This can lead to OOMs for long-lived SparkContexts 
 that create many large accumulators.
 Part of the problem is that accumulators are registered in a global 
 {{Accumulators}} registry.  Maybe the fix would be as simple as using weak 
 references in the Accumulators registry so that accumulators can be GC'd once 
 they can no longer be used.
 In the meantime, here's a workaround that users can try:
 Accumulators have a public setValue() method that can be called (only by the 
 driver) to change an accumulator’s value.  You might be able to use this to 
 reset accumulators’ values to smaller objects (e.g. the “zero” object of 
 whatever your accumulator type is, or ‘null’ if you’re sure that the 
 accumulator will never be accessed again).
 This issue was originally reported by [~nkronenfeld] on the dev mailing list: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-Accumulator-question-td8709.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org