[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172055#comment-14172055 ] Saisai Shao commented on SPARK-3948: Hi Josh, thanks for your help. I don't think it's un-flushed input file which makes output file copy length error, since bunch of {{partitionWriters}} are flushed and closed before copying to output file, as you can see line 704 in ExternalSorter {{partitionWriters.foreach(_.commitAndClose())}}. I still doubt that current use of {{transferTo}} may have some problems which will ruin the output file, I try to add append flag for the output file in line 708, seems the problem is gone. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172062#comment-14172062 ] Josh Rosen commented on SPARK-3948: --- Hi [~jerryshao], To make sure that I understand, are you saying that changing {code} out = new FileOutputStream(outputFile) {code} to {code} out = new FileOutputStream(outputFile, append=True) {code} seems to solve the issue, even while continuing to use the {{transferTo}} code? Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3954) promote the speed of convert files to RDDS
宿荣全 created SPARK-3954: -- Summary: promote the speed of convert files to RDDS Key: SPARK-3954 URL: https://issues.apache.org/jira/browse/SPARK-3954 Project: Spark Issue Type: Improvement Components: Input/Output Affects Versions: 1.1.0, 1.0.0 Reporter: 宿荣全 about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1、files.map(...) 2、files.zip(fileRDDs) 3、files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence = only one loop spark source code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, V, F](file)) files.zip(fileRDDs).foreach { case (file, rdd) = { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } }} new UnionRDD(context.sparkContext, fileRDDs) } // --- modified code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = for (file - files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } rdd } new UnionRDD(context.sparkContext, fileRDDs) } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172080#comment-14172080 ] Josh Rosen commented on SPARK-3948: --- I'm surprised that the lack of {{append}} didn't cause problems prior to the {{transferTo}} patch. To try to reason this out: *Old code, no append*: We use same FileOutputStream object to write all of the partitions, copying each partition's data by calling {{FileOutputStream.write}} on this same FileOutputStream object. Since we're not in append mode, the first partition is written at the beginning of the file and all subsequent partitions are appended to it. *New {{transferTo}} code, no append:* We call {{FileOutputStream.getChannel()}} for each partition; which [returns the unique FileChannel object associated with this file output stream|http://docs.oracle.com/javase/7/docs/api/java/io/FileOutputStream.html#getChannel()]. Since we're not in append mode, this channel's initial position should be equal to the number of bytes written to the file so far, which should initially be 0 since we just created a new FileOutputStream and haven't written anything using it. According to [its docs|http://docs.oracle.com/javase/6/docs/api/java/nio/channels/FileChannel.html#transferTo(long, long, java.nio.channels.WritableByteChannel)], {{transferTo}} should increment the target channel's position by the number of bytes written. So, in this case it also seems like the first partition should be written to the beginning of the file while others are appended after it. Am I missing an obvious bug here? Is there a reason why these two copy implementations don't behave identically when append is disabled? Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643
[jira] [Created] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl
Jongyoul Lee created SPARK-3955: --- Summary: Different versions between jackson-mapper-asl and jackson-core-asl Key: SPARK-3955 URL: https://issues.apache.org/jira/browse/SPARK-3955 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 1.1.0 Reporter: Jongyoul Lee In the parent pom.xml, specified a version of jackson-mapper-asl. This is used by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl is not same as jackson-core-asl. This is because other libraries use several versions of jackson, so other version of jackson-core-asl is assembled. Simply, fix this problem if pom.xml has a specific version information of jackson-core-asl. If it's not set, a version 1.9.11 is merged info assembly.jar and we cannot use jackson library properly. {code} [INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the shaded jar. [INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the shaded jar. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172116#comment-14172116 ] Mridul Muralidharan commented on SPARK-3948: [~joshrosen] Assuming there are no VM bugs being hit for inChannel.size() or some other concurrent writes to these files, I dont see any issues with the code - as you elaborated. On other hand, external sort code is slightly loose w.r.t use of file api - not sure if that is causing the observed problems : example, use of skip() in SortShuffleManager.scala. We will need to investigate in detail if some of these are causing the observed problems. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) -
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172117#comment-14172117 ] Saisai Shao commented on SPARK-3948: Hi Josh, I think for old code without append, the behavior is correct even without {{append}} flag, because we never close the output stream between different partitions, so input streams for each partition will always write to the end of output stream, I think that's correct. For new {{transferTo}} code without append, theoretically it's correct that target channel's position should increment by the bytes written, but according to my investigation through logs, I found that for the subsequent partitions, output channel's position unexpectedly pointed to 0, not the previous output file size, that's also the unexplained behavior which makes me confuse a lot. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172134#comment-14172134 ] Mridul Muralidharan commented on SPARK-3948: [~jerryshao] Just to clarify, what exactly is the behavior you are observing ? - Is it that getChannel is returning a channel which has position == 0 after writing bytes to the stream ? (size 0) If yes, what is the channel's length you are observing in that case ? Also, how large are the file sizes ? The documentation of getChannel and transferTo are fairly unambigous ... so our code, as written, is conforment to that. Ofcourse, it is always possible we are hitting some bugs in some scenarios ! What is the environment you are running this on btw ? OS/jvm version ? Thanks. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large.
[jira] [Commented] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl
[ https://issues.apache.org/jira/browse/SPARK-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172148#comment-14172148 ] Sean Owen commented on SPARK-3955: -- Looks like Jackson is managed to version 1.8.8 for Avro reasons. I think core just needs to be managed the same way. I'll try it locally to make sure that works. Different versions between jackson-mapper-asl and jackson-core-asl -- Key: SPARK-3955 URL: https://issues.apache.org/jira/browse/SPARK-3955 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 1.1.0 Reporter: Jongyoul Lee In the parent pom.xml, specified a version of jackson-mapper-asl. This is used by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl is not same as jackson-core-asl. This is because other libraries use several versions of jackson, so other version of jackson-core-asl is assembled. Simply, fix this problem if pom.xml has a specific version information of jackson-core-asl. If it's not set, a version 1.9.11 is merged info assembly.jar and we cannot use jackson library properly. {code} [INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the shaded jar. [INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the shaded jar. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172152#comment-14172152 ] Saisai Shao commented on SPARK-3948: Hi [~mridul], what I observed is that, after writing bytes to channel, still get the position 0 in the next calling, here is one of the log that I traced: {noformat} java.lang.AssertionError: assertion failed: outChannel before size, 272314, before position: 0. Current size: 284167, current position: 0, expected write data size: 284167 {noformat} I just print out the channel size and position *before* and *after* the {{transferTo}} action. As you can see the output file size is not 0 before written, but the position is 0, not the end of channel. After written to it use {{transferTo}}, the file size is changed, but not 272314 + 284167 as expected, and the position also not move to the end. Also I add append flag as I mentioned before, here the log changes to: {noformat} 14/10/15 13:14:12 INFO util.Utils: outChannel before size, 0, before position: 0. Current size: 294896, current position: 294896, expected write data size: 294896 14/10/15 13:14:12 INFO util.Utils: outChannel before size, 294896, before position: 294896. Current size: 551216, current position: 551216, expected write data size: 256320 {noformat} And my OS Redhat 6.2, and JVM is 1.8.0_20-ea. Appreciate your suggestions, thanks a lot. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562
[jira] [Closed] (SPARK-3898) History Web UI display incorrectly.
[ https://issues.apache.org/jira/browse/SPARK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zzc closed SPARK-3898. -- Resolution: Not a Problem Target Version/s: 1.2.0 History Web UI display incorrectly. --- Key: SPARK-3898 URL: https://issues.apache.org/jira/browse/SPARK-3898 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 1.2.0 Environment: Spark 1.2.0-snapshot On Yarn Reporter: zzc Fix For: 1.2.0 After successfully run an spark application, history web ui display incorrectly: App Name:Not Started Started:1970/01/01 07:59:59 Spark User:Not Started Last Updated:2014/10/10 14:50:39 Exception message: 2014-10-10 14:51:14,284 - ERROR - org.apache.spark.Logging$class.logError(Logging.scala:96) - qtp1594785497-16851 -Exception in parsing Spark event log hdfs://wscluster/sparklogs/24.3g_15_5g_2c-1412923684977/EVENT_LOG_1 org.json4s.package$MappingException: Did not find value which can be converted into int at org.json4s.reflect.package$.fail(package.scala:96) at org.json4s.Extraction$.convert(Extraction.scala:554) at org.json4s.Extraction$.extract(Extraction.scala:331) at org.json4s.Extraction$.extract(Extraction.scala:42) at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21) at org.apache.spark.util.JsonProtocol$.blockManagerIdFromJson(JsonProtocol.scala:647) at org.apache.spark.util.JsonProtocol$.blockManagerAddedFromJson(JsonProtocol.scala:468) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:404) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$loadAppInfo(FsHistoryProvider.scala:181) at org.apache.spark.deploy.history.FsHistoryProvider.getAppUI(FsHistoryProvider.scala:99) at org.apache.spark.deploy.history.HistoryServer$$anon$3.load(HistoryServer.scala:55) at org.apache.spark.deploy.history.HistoryServer$$anon$3.load(HistoryServer.scala:53) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.deploy.history.HistoryServer$$anon$1.doGet(HistoryServer.scala:88) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at
[jira] [Commented] (SPARK-3954) promote the speed of convert files to RDDS
[ https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172167#comment-14172167 ] Apache Spark commented on SPARK-3954: - User 'surq' has created a pull request for this issue: https://github.com/apache/spark/pull/2811 promote the speed of convert files to RDDS --- Key: SPARK-3954 URL: https://issues.apache.org/jira/browse/SPARK-3954 Project: Spark Issue Type: Improvement Components: Input/Output Affects Versions: 1.0.0, 1.1.0 Reporter: 宿荣全 about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1、files.map(...) 2、files.zip(fileRDDs) 3、files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence = only one loop spark source code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, V, F](file)) files.zip(fileRDDs).foreach { case (file, rdd) = { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } }} new UnionRDD(context.sparkContext, fileRDDs) } // --- modified code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = for (file - files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } rdd } new UnionRDD(context.sparkContext, fileRDDs) } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172225#comment-14172225 ] Mridul Muralidharan commented on SPARK-3948: That is weird, I tried a stripped down version to test just this - and it seems to be working fine. In scala interpreter, this seems to work as expected. {code:java} import java.io._ import java.nio.ByteBuffer import java.nio._ import java.nio.channels._ def copyStream(in: InputStream, out: OutputStream, closeStreams: Boolean = false): Long = { var count = 0L try { if (in.isInstanceOf[FileInputStream] out.isInstanceOf[FileOutputStream]) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() println(size = + outChannel.size) println(position = + outChannel.position) val size = inChannel.size() // In case transferTo method transferred less data than we have required. while (count size) { count += inChannel.transferTo(count, size - count, outChannel) } } else { val buf = new Array[Byte](8192) var n = 0 while (n != -1) { n = in.read(buf) if (n != -1) { out.write(buf, 0, n) count += n } } } count } finally { if (closeStreams) { try { in.close() } finally { out.close() } } } } val out = new FileOutputStream(output) for (i - 0 until 10) { val in = new FileInputStream(t) val size = copyStream(in, out, false) println(size = + size + for i = + i) in.close() } out.close() {code} Scenarios tried : a) No output file. b) Empty output file. c) Non empty output file. And it seemed to work fine (and as expected) for all the cases. Can you try this at your end ? I want to eliminate any potential environment issues. I tried this with 1.7.0_55 and 1.8.0_25 ... Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172226#comment-14172226 ] Mridul Muralidharan commented on SPARK-3948: Note, t is just some file with a few strings in it - simply generate something locally. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3939) NPE caused by SessionState.out not set in thriftserver2
[ https://issues.apache.org/jira/browse/SPARK-3939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172230#comment-14172230 ] Apache Spark commented on SPARK-3939: - User 'adrian-wang' has created a pull request for this issue: https://github.com/apache/spark/pull/2812 NPE caused by SessionState.out not set in thriftserver2 --- Key: SPARK-3939 URL: https://issues.apache.org/jira/browse/SPARK-3939 Project: Spark Issue Type: Bug Components: SQL Reporter: Adrian Wang Assignee: Adrian Wang a simple 'set' query can reproduce this in thriftserver. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172371#comment-14172371 ] Saisai Shao commented on SPARK-3948: Hi [~mridulm80], thanks a lot for your suggestions, I tested with different Java version and two OS: Redhat 6.2 and Ubuntu 14.04, I think this behavior is OS or kernel related, not jvm version, you can see the output as below: First is the result under Redhat, I tried Java 1.8.0_20, 1.7.0_60 and 1.7.0_04, the result is: {noformat} size = 0 position = 0 size = 118 for i = 0 size = 118 position = 0 size = 118 for i = 1 size = 118 position = 0 size = 118 for i = 2 size = 118 position = 0 size = 118 for i = 3 size = 118 position = 0 size = 118 for i = 4 size = 118 position = 0 size = 118 for i = 5 size = 118 position = 0 size = 118 for i = 6 size = 118 position = 0 size = 118 for i = 7 size = 118 position = 0 size = 118 for i = 8 size = 118 position = 0 size = 118 for i = 9 {noformat} Obvious the position is always 0, so the final output file size is just 118. Then changed to Ubuntu 14.04, with Java 1.7.0_04 and 1.7.0_51, the result shows as below: {noformat} size = 0 position = 0 size = 118 for i = 0 size = 118 position = 118 size = 118 for i = 1 size = 236 position = 236 size = 118 for i = 2 size = 354 position = 354 size = 118 for i = 3 size = 472 position = 472 size = 118 for i = 4 size = 590 position = 590 size = 118 for i = 5 size = 708 position = 708 size = 118 for i = 6 size = 826 position = 826 size = 118 for i = 7 size = 944 position = 944 size = 118 for i = 8 size = 1062 position = 1062 size = 118 for i = 9 {noformat} This is the result we expected, and position also seek to the right end, so the final output file size is 1180 as expected. My Ubuntu machine's kernel version is: 3.13.0-37-generic; and Redhat machine's kernel version is: 2.6.32-220.el6.x86_64. So I guess maybe the difference relies on kernel version or OS version, but still need to verify. Besides I think to keep the consistency, we can add *append=true* flag to enforce the same output even in different platforms. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172387#comment-14172387 ] Praveen Seluka commented on SPARK-3174: --- [~andrewor] - Can you also comment on the API exposed to add/delete executors from SparkContext ? I believe it will be, sc.addExecutors(count : Int) sc.deleteExecutors(List[String]) [~sandyr] [~tgraves] [~andrewor] [~vanzin] - Can you please take a look at the design doc I have proposed. I am sure there are some pros in doing it this way - Have indicated them in detail in the doc. Since, it does not change Spark Core itself, you could easily replace with another pluggable algorithm for dynamic scaling. I know that Anrdrew already have a PR based on his design doc, but would surely love to get some feedback. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2075) Anonymous classes are missing from Spark distribution
[ https://issues.apache.org/jira/browse/SPARK-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172394#comment-14172394 ] Iulian Dragos commented on SPARK-2075: -- The Scala compiler produces stable names for anonymous functions. In fact, that's the reason why the name of the enclosing method is part of the name: so that adding or removing an anonymous function in another method does not change the numbering of the others. Names are assigned by using a per-compilation unit counter and a prefix. Looking at the diff, there's quite a different picture in the two cases (anonymous functions vs. anonymous classes). Are you sure the two jars are built from the same sources? I don't know how the `assembly` jar is produced, but if it's using some sort of whole-program analysis and dead-code elimination, it might erroneously remove them. It might help to look at the inputs to the assembly and see if the class is already missing. Another possibility is running `scalac -optimize` in only one of the two builds. However, looking at current sources I can't see why the inliner would remove those closures (the class is not final, and `map` is not final either, so they can't be resolved and inlined).. Anonymous classes are missing from Spark distribution - Key: SPARK-2075 URL: https://issues.apache.org/jira/browse/SPARK-2075 Project: Spark Issue Type: Bug Components: Build, Spark Core Affects Versions: 1.0.0 Reporter: Paul R. Brown Priority: Critical Fix For: 1.0.1 Running a job built against the Maven dep for 1.0.0 and the hadoop1 distribution produces: {code} java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 {code} Here's what's in the Maven dep as of 1.0.0: {code} jar tvf ~/.m2/repository/org/apache/spark/spark-core_2.10/1.0.0/spark-core_2.10-1.0.0.jar | grep 'rdd/RDD' | grep 'saveAs' 1519 Mon May 26 13:57:58 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class 1560 Mon May 26 13:57:58 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class {code} And here's what's in the hadoop1 distribution: {code} jar tvf spark-assembly-1.0.0-hadoop1.0.4.jar| grep 'rdd/RDD' | grep 'saveAs' {code} I.e., it's not there. It is in the hadoop2 distribution: {code} jar tvf spark-assembly-1.0.0-hadoop2.2.0.jar| grep 'rdd/RDD' | grep 'saveAs' 1519 Mon May 26 07:29:54 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class 1560 Mon May 26 07:29:54 PDT 2014 org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3629) Improvements to YARN doc
[ https://issues.apache.org/jira/browse/SPARK-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172416#comment-14172416 ] ssj commented on SPARK-3629: I have pushed an request on github about it. https://github.com/apache/spark/pull/2813 Improvements to YARN doc Key: SPARK-3629 URL: https://issues.apache.org/jira/browse/SPARK-3629 Project: Spark Issue Type: Documentation Components: Documentation, YARN Reporter: Matei Zaharia Labels: starter Right now this doc starts off with a big list of config options, and only then tells you how to submit an app. It would be better to put that part and the packaging part first, and the config options only at the end. In addition, the doc mentions yarn-cluster vs yarn-client as separate masters, which is inconsistent with the help output from spark-submit (which says to always use yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3629) Improvements to YARN doc
[ https://issues.apache.org/jira/browse/SPARK-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172444#comment-14172444 ] Apache Spark commented on SPARK-3629: - User 'ssjssh' has created a pull request for this issue: https://github.com/apache/spark/pull/2813 Improvements to YARN doc Key: SPARK-3629 URL: https://issues.apache.org/jira/browse/SPARK-3629 Project: Spark Issue Type: Documentation Components: Documentation, YARN Reporter: Matei Zaharia Labels: starter Right now this doc starts off with a big list of config options, and only then tells you how to submit an app. It would be better to put that part and the packaging part first, and the config options only at the end. In addition, the doc mentions yarn-cluster vs yarn-client as separate masters, which is inconsistent with the help output from spark-submit (which says to always use yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1209) SparkHadoopUtil should not use package org.apache.hadoop
[ https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172479#comment-14172479 ] Apache Spark commented on SPARK-1209: - User 'srowen' has created a pull request for this issue: https://github.com/apache/spark/pull/2814 SparkHadoopUtil should not use package org.apache.hadoop Key: SPARK-1209 URL: https://issues.apache.org/jira/browse/SPARK-1209 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Sandy Ryza Assignee: Mark Grover It's private, so the change won't break compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2429) Hierarchical Implementation of KMeans
[ https://issues.apache.org/jira/browse/SPARK-2429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172486#comment-14172486 ] RJ Nowling commented on SPARK-2429: --- Great to know! I'm glad that isn't a bottleneck. Have you been able to benchmark each of the major steps? Which steps are most expensive? On Wed, Oct 15, 2014 at 10:24 AM, Yu Ishikawa (JIRA) j...@apache.org -- em rnowl...@gmail.com c 954.496.2314 Hierarchical Implementation of KMeans - Key: SPARK-2429 URL: https://issues.apache.org/jira/browse/SPARK-2429 Project: Spark Issue Type: New Feature Components: MLlib Reporter: RJ Nowling Assignee: Yu Ishikawa Priority: Minor Attachments: The Result of Benchmarking a Hierarchical Clustering.pdf Hierarchical clustering algorithms are widely used and would make a nice addition to MLlib. Clustering algorithms are useful for determining relationships between clusters as well as offering faster assignment. Discussion on the dev list suggested the following possible approaches: * Top down, recursive application of KMeans * Reuse DecisionTree implementation with different objective function * Hierarchical SVD It was also suggested that support for distance metrics other than Euclidean such as negative dot or cosine are necessary. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3873) Scala style: check import ordering
[ https://issues.apache.org/jira/browse/SPARK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172552#comment-14172552 ] Marcelo Vanzin commented on SPARK-3873: --- It doesn't seem like I can unassing bugs... Scala style: check import ordering -- Key: SPARK-3873 URL: https://issues.apache.org/jira/browse/SPARK-3873 Project: Spark Issue Type: Sub-task Components: Project Infra Reporter: Reynold Xin Assignee: Marcelo Vanzin -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1209) SparkHadoopUtil should not use package org.apache.hadoop
[ https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172571#comment-14172571 ] Sean Owen commented on SPARK-1209: -- ... and why wouldn't you, that's the title of the JIRA, oops. It's not that class that moves or even changes actually, and yes it should not move. Let me fix the title and fix my PR too. Maybe that's a more palatable change. SparkHadoopUtil should not use package org.apache.hadoop Key: SPARK-1209 URL: https://issues.apache.org/jira/browse/SPARK-1209 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Sandy Ryza Assignee: Mark Grover It's private, so the change won't break compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172582#comment-14172582 ] Josh Rosen commented on SPARK-3948: --- That kernel bug seems like it could explain why the {{transferTo}} change caused problems in sort-based shuffle. It looks like SPARK-3630 also describes some other places where the Kryo PARSING_ERROR(2) has occurred, so I'm going to try to figure out which of these other cases might also hit this {{transferTo}} code path. In terms of a bug fix / workaround: maybe we can open the file in append mode, since that seems to solve the problem, and also add an internal configuration option to disable the transferTo code; this configuration / feature-flag would provide an escape hatch for users in case the append fix doesn't work. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172585#comment-14172585 ] Mridul Muralidharan commented on SPARK-3948: [~jerryshao] great work ! I agree, append might be a workaround to consider (given the semantics of getChannel when stream is opened with append). On other hand, since this piece of code might be used in general context also (the copyStreams) - what about logging a warning in case position != initialPosition + size at end of the transferTo loop ? Warning users that they should upgrade kernal ? (and explicitly modifying position as workaround) Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To
[jira] [Commented] (SPARK-3704) the types not match adding value form spark row to hive row in SparkSQLOperationManager
[ https://issues.apache.org/jira/browse/SPARK-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172645#comment-14172645 ] JIng Wang commented on SPARK-3704: -- Will this fix be merged to branch-v1.1 for the next 1.1.x release? the types not match adding value form spark row to hive row in SparkSQLOperationManager --- Key: SPARK-3704 URL: https://issues.apache.org/jira/browse/SPARK-3704 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.1.0 Reporter: wangfei Fix For: 1.2.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3956) Python API for Distributed Matrix
[ https://issues.apache.org/jira/browse/SPARK-3956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davies Liu updated SPARK-3956: -- Priority: Minor (was: Blocker) Python API for Distributed Matrix - Key: SPARK-3956 URL: https://issues.apache.org/jira/browse/SPARK-3956 Project: Spark Issue Type: New Feature Reporter: Davies Liu Assignee: Davies Liu Priority: Minor Python API for distributed matrix -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3174) Provide elastic scaling within a Spark application
[ https://issues.apache.org/jira/browse/SPARK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172861#comment-14172861 ] Marcelo Vanzin commented on SPARK-3174: --- [~praveenseluka] I took a quick look at your design. I have a few coments: * It ignores the shuffle, which is coverend in Andrew's design. You can't scale down without figuring out what to do with the shuffle data of completed states, since it may be reused. It's not just about cached blocks. * Overall, I like the idea of having pluggable scaling algorithms, but I think it's too early for a public add/remove executor API. For example, standalone mode doesn't support adding or removing executors, AFAIK. * The task runtime-based heuristic looks a bit weird. I think it needs to be more dynamic - e.g., actually consider the backlog of tasks vs. number of tasks that can be run concurrently on the current executors. Also, doubling number of executors seems a bit like a sledgehammer - an exponential approach like Andrew has proposed might be able to reach similar results with lower overall system load. I haven't looked at Andrew's PR yet (it's on my todo list), but I don't see a lot that is that much different in the two proposals. I agree that if it's possible it would be nice to keep the autoscaling code / interfaces as separate as possible from the current core, if only to make them easier to replace / refactor / disable, but then again, I haven't looked at Andrew's code yet. Provide elastic scaling within a Spark application -- Key: SPARK-3174 URL: https://issues.apache.org/jira/browse/SPARK-3174 Project: Spark Issue Type: Improvement Components: Spark Core, YARN Affects Versions: 1.0.2 Reporter: Sandy Ryza Assignee: Andrew Or Attachments: SPARK-3174design.pdf, SparkElasticScalingDesignB.pdf, dynamic-scaling-executors-10-6-14.pdf A common complaint with Spark in a multi-tenant environment is that applications have a fixed allocation that doesn't grow and shrink with their resource needs. We're blocked on YARN-1197 for dynamically changing the resources within executors, but we can still allocate and discard whole executors. It would be useful to have some heuristics that * Request more executors when many pending tasks are building up * Discard executors when they are idle See the latest design doc for more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3936) Incorrect result in GraphX BytecodeUtils with closures + class/object methods
[ https://issues.apache.org/jira/browse/SPARK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172867#comment-14172867 ] Apache Spark commented on SPARK-3936: - User 'jegonzal' has created a pull request for this issue: https://github.com/apache/spark/pull/2815 Incorrect result in GraphX BytecodeUtils with closures + class/object methods - Key: SPARK-3936 URL: https://issues.apache.org/jira/browse/SPARK-3936 Project: Spark Issue Type: Bug Components: GraphX Reporter: Pedro Rodriguez Assignee: Ankur Dave There seems to be a bug with the GraphX byte code inspection, specifically in BytecodeUtils. These are the unit tests I wrote to expose the problem: https://github.com/EntilZha/spark/blob/a3c38a8329545c034fae2458df134fa3829d08fb/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala#L93-L121 The first two tests pass, the second two tests fail. This exposes a problem with inspection of methods in closures, in this case within maps. Specifically, it seems like there is a problem with inspection of non-inline methods in a closure. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions
[ https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172895#comment-14172895 ] Josh Rosen commented on SPARK-3948: --- [~mridulm80], that explicit position-checking and modification idea is a clever workaround. I think that this JIRA explains the cause of the stream-corruption issues that we've seen in the sort-based shuffle code, but after some investigation I think that it doesn't explain the similar errors that we've seen when deserializing tasks that were broadcasted with TorrentBroadcast; I'll keep investigating this. Sort-based shuffle can lead to assorted stream-corruption exceptions Key: SPARK-3948 URL: https://issues.apache.org/jira/browse/SPARK-3948 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Saisai Shao Assignee: Saisai Shao Several exceptions occurred when running TPC-DS queries against latest master branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, deserializing error in Kryo and offset out-range in FileManagedBuffer, all these exceptions are gone when we changed to hash-based shuffle. With deep investigation, we found that some shuffle output file is unexpectedly smaller than the others, as the log shows: {noformat} 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509 {noformat} As you can see the total file length of shuffle_6_11_11 is much smaller than other same stage map output results. And we also dump the map outputs in map side to see if this small size output is correct or not, below is the log: {noformat} In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local- 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length: 274722 262597 291290 272902 264941 270358 291005 295285 252482 287142 232617 259871 233734 241439 228897 234282 253834 235619 233803 255532 270739 253825 262087 266404 234273 250120 262983 257024 255947 254971 258908 247862 221613 258566 245399 251684 274843 226150 264278 245279 225656 235084 239466 212851 242245 218781 222191 215500 211548 234256 208601 204113 191923 217895 227020 215331 212313 223725 250876 256875 239276 266777 235520 237462 234063 242270 246825 255888 235937 236956 233099 264508 260303 233294 239061 254856 257475 230105 246553 260412 210355 211201 219572 206636 226866 209937 226618 218208 206255 248069 221717 222112 215734 248088 239207 246125 239056 241133 253091 246738 233128 242794 231606 255737 221123 252115 247286 229688 251087 250047 237579 263079 256251 238214 208641 201120 204009 200825 211965 200600 194492 226471 194887 226975 215072 206008 233288 222132 208860 219064 218162 237126 220465 201343 225711 232178 233786 212767 211462 213671 215853 227822 233782 214727 247001 228968 247413 222674 214241 184122 215643 207665 219079 215185 207718 212723 201613 216600 212591 208174 204195 208099 229079 230274 223373 214999 256626 228895 231821 383405 229646 220212 245495 245960 227556 213266 237203 203805 240509 239306 242365 218416 238487 219397 240026 251011 258369 255365 259811 283313 248450 264286 264562 257485 279459 249187 257609 274964 292369 273826 {noformat} Here I dump the file name, length and each partition's length, obviously the sum of all partition lengths is not equal to file length. So I think there may be a situation paritionWriter in ExternalSorter not always append to the end of previous written file, the file's content is overwritten in some parts, and this lead to the exceptions I mentioned before. Also I changed the code of copyStream by disable transferTo, use the previous one, all the issues are gone. So I think there maybe some flushing problems in transferTo when processed data is large. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail:
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172917#comment-14172917 ] Fi commented on SPARK-2883: --- This sounds great! Is this a scala only patch, or would it also somehow work in python? Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3630) Identify cause of Kryo+Snappy PARSING_ERROR
[ https://issues.apache.org/jira/browse/SPARK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172926#comment-14172926 ] Josh Rosen commented on SPARK-3630: --- I think that there may be multiple causes of these decompression errors: *Errors in sort-based shuffle*: SPARK-3948 suggests that this is due to stream-corruption that's caused by an interaction between a Linux kernel bug and our use of {{transferTo}} for performing zero-copy IO when spilling shuffle data. *Errors when deserializing broadcasted tasks*: this is probably also caused by stream-corruption, but the source of that corruption isn't clear yet. I've opened SPARK-3958 to investigate TorrentBroadcast separately. If you encounter a Kryo or Snappy decompression error that is _not_ covered by these two issues, please post a comment in this issue to let me know. Identify cause of Kryo+Snappy PARSING_ERROR --- Key: SPARK-3630 URL: https://issues.apache.org/jira/browse/SPARK-3630 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 1.1.0 Reporter: Andrew Ash A recent GraphX commit caused non-deterministic exceptions in unit tests so it was reverted (see SPARK-3400). Separately, [~aash] observed the same exception stacktrace in an application-specific Kryo registrator: {noformat} com.esotericsoftware.kryo.KryoException: java.io.IOException: failed to uncompress the chunk: PARSING_ERROR(2) com.esotericsoftware.kryo.io.Input.fill(Input.java:142) com.esotericsoftware.kryo.io.Input.require(Input.java:169) com.esotericsoftware.kryo.io.Input.readInt(Input.java:325) com.esotericsoftware.kryo.io.Input.readFloat(Input.java:624) com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:127) com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:117) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) ... {noformat} This ticket is to identify the cause of the exception in the GraphX commit so the faulty commit can be fixed and merged back into master. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3630) Identify cause of Kryo+Snappy PARSING_ERROR
[ https://issues.apache.org/jira/browse/SPARK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-3630: -- Target Version/s: 1.1.1, 1.2.0 Affects Version/s: 1.2.0 Assignee: Josh Rosen Identify cause of Kryo+Snappy PARSING_ERROR --- Key: SPARK-3630 URL: https://issues.apache.org/jira/browse/SPARK-3630 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 1.1.0, 1.2.0 Reporter: Andrew Ash Assignee: Josh Rosen A recent GraphX commit caused non-deterministic exceptions in unit tests so it was reverted (see SPARK-3400). Separately, [~aash] observed the same exception stacktrace in an application-specific Kryo registrator: {noformat} com.esotericsoftware.kryo.KryoException: java.io.IOException: failed to uncompress the chunk: PARSING_ERROR(2) com.esotericsoftware.kryo.io.Input.fill(Input.java:142) com.esotericsoftware.kryo.io.Input.require(Input.java:169) com.esotericsoftware.kryo.io.Input.readInt(Input.java:325) com.esotericsoftware.kryo.io.Input.readFloat(Input.java:624) com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:127) com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer.read(DefaultSerializers.java:117) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) ... {noformat} This ticket is to identify the cause of the exception in the GraphX commit so the faulty commit can be fixed and merged back into master. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3959) SqlParser fails to parse literal -9223372036854775808 (Long.MinValue).
Kousuke Saruta created SPARK-3959: - Summary: SqlParser fails to parse literal -9223372036854775808 (Long.MinValue). Key: SPARK-3959 URL: https://issues.apache.org/jira/browse/SPARK-3959 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Kousuke Saruta Priority: Critical SqlParser fails to parse -9223372036854775808 (Long.MinValue) so we cannot write queries such like as follows. {code} SELECT value FROM someTable WHERE value -9223372036854775808 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3960) We cannot apply unary minus only to literal.
Kousuke Saruta created SPARK-3960: - Summary: We cannot apply unary minus only to literal. Key: SPARK-3960 URL: https://issues.apache.org/jira/browse/SPARK-3960 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Kousuke Saruta Priority: Critical Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) -column -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3960) We cannot apply unary minus only to literal.
[ https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kousuke Saruta updated SPARK-3960: -- Description: Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) // Parenthesized expressions -column // Columns -MAX(column) // Functions {code} was: Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) -column We cannot apply unary minus only to literal. Key: SPARK-3960 URL: https://issues.apache.org/jira/browse/SPARK-3960 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Kousuke Saruta Priority: Critical Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) // Parenthesized expressions -column // Columns -MAX(column) // Functions {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3960) We can apply unary minus only to literal.
[ https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kousuke Saruta updated SPARK-3960: -- Summary: We can apply unary minus only to literal. (was: We cannot apply unary minus only to literal.) We can apply unary minus only to literal. - Key: SPARK-3960 URL: https://issues.apache.org/jira/browse/SPARK-3960 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Kousuke Saruta Priority: Critical Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) // Parenthesized expressions -column // Columns -MAX(column) // Functions {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172949#comment-14172949 ] Josh Rosen commented on SPARK-3958: --- Digging into this stacktrace in more detail: Snappy-java's {{SnappyOutputStream}} writes its own 8-byte header at the beginning of the serialized output. This header consists of an 8-byte magic value followed by two 4-byte version numbers. This header is distinct from Snappy's own 6-byte magic number / header. {{org.xerial.snappy.SnappyInputStream.readHeader}} is implemented like this (in Snappy-Java 1.1.1.3): {code} protected void readHeader() throws IOException { byte[] header = new byte[SnappyCodec.headerSize()]; int readBytes = 0; while (readBytes header.length) { int ret = in.read(header, readBytes, header.length - readBytes); if (ret == -1) break; readBytes += ret; } // Quick test of the header if (readBytes header.length || header[0] != SnappyCodec.MAGIC_HEADER[0]) { // do the default uncompression readFully(header, readBytes); return; } SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header)); if (codec.isValidMagicHeader()) { // The input data is compressed by SnappyOutputStream if (codec.version SnappyCodec.MINIMUM_COMPATIBLE_VERSION) { throw new IOException(String.format( compressed with imcompatible codec version %d. At least version %d is required, codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION)); } } else { // (probably) compressed by Snappy.compress(byte[]) readFully(header, readBytes); return; } } {code} It starts by attempting to read the 8-byte header. The first {{while}} loop exits when we've either read 8 bytes of header data or if the input stream was closed before it could read a complete header. The following code checks whether the header is unexpectedly short or whether it doesn't match the snappy-java magic header. In our case, we end up taking this branch and calling {{readFully(header, readBytes)}} in order to perform the default Snappy decompression This is the wrong branch to take (since our data was compressed with a SnappyOutputStream), leading to the PARSING_ERROR. Based on this, I think that the input data to the SnappyInputStream is somehow being corrupted. It's not obvious whether this corruption is causing the input data to be too short or whether the start of the stream has the wrong contents. I'll keep digging and look into adding some size-checking assertions throughout our code. Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0, 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
[jira] [Commented] (SPARK-3960) We can apply unary minus only to literal.
[ https://issues.apache.org/jira/browse/SPARK-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172986#comment-14172986 ] Apache Spark commented on SPARK-3960: - User 'sarutak' has created a pull request for this issue: https://github.com/apache/spark/pull/2816 We can apply unary minus only to literal. - Key: SPARK-3960 URL: https://issues.apache.org/jira/browse/SPARK-3960 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Kousuke Saruta Priority: Critical Because of the wrong syntax definition, we cannot apply unary minus only to literal. So, we cannot write such expressions. {code} -(value1 + value2) // Parenthesized expressions -column // Columns -MAX(column) // Functions {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3937) Unsafe memory access inside of Snappy library
[ https://issues.apache.org/jira/browse/SPARK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173025#comment-14173025 ] Josh Rosen commented on SPARK-3937: --- Another occurrence of this problem, running a recent-ish version of master (1.2): {code} java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:351) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:384) at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2293) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2586) at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2596) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1318) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} Unsafe memory access inside of Snappy library - Key: SPARK-3937 URL: https://issues.apache.org/jira/browse/SPARK-3937 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Patrick Wendell This was observed on master between Spark 1.1 and 1.2. Unfortunately I don't have much information about this other than the stack trace. However, it was concerning enough I figured I should post it. {code} java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code org.xerial.snappy.SnappyNative.rawUncompress(Native Method) org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) org.xerial.snappy.Snappy.uncompress(Snappy.java:480) org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:355) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2712) java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742) java.io.ObjectInputStream.readArray(ObjectInputStream.java:1687) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
[jira] [Updated] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-3958: -- Target Version/s: 1.2.0 (was: 1.1.1, 1.2.0) Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors. This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-3958: -- Affects Version/s: (was: 1.1.0) Removing 1.1.0 as an affected version for now, since the stacktrace that I posted here was from a recent build of master (1.2). If anyone can reproduce this in branch-1.1 or Spark 1.1.0, please let me know. Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors. This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173154#comment-14173154 ] Josh Rosen commented on SPARK-3958: --- I think that I can safely rule out problems in TorrentBroadcast's (de-)blockification code: I used ScalaCheck to write some tests to ensure that blockifyObject and unblockifyObject are inverses, plus a similar test for ByteArrayChunkOutputStream: https://github.com/JoshRosen/spark/commit/413be7f6a8d4eb14c69c7db87e2564ed4d776c42?diff=unified Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors. This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3955) Different versions between jackson-mapper-asl and jackson-core-asl
[ https://issues.apache.org/jira/browse/SPARK-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173166#comment-14173166 ] Apache Spark commented on SPARK-3955: - User 'jongyoul' has created a pull request for this issue: https://github.com/apache/spark/pull/2818 Different versions between jackson-mapper-asl and jackson-core-asl -- Key: SPARK-3955 URL: https://issues.apache.org/jira/browse/SPARK-3955 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 1.1.0 Reporter: Jongyoul Lee In the parent pom.xml, specified a version of jackson-mapper-asl. This is used by sql/hive/pom.xml. When mvn assembly runs, however, jackson-mapper-asl is not same as jackson-core-asl. This is because other libraries use several versions of jackson, so other version of jackson-core-asl is assembled. Simply, fix this problem if pom.xml has a specific version information of jackson-core-asl. If it's not set, a version 1.9.11 is merged info assembly.jar and we cannot use jackson library properly. {code} [INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8 in the shaded jar. [INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.11 in the shaded jar. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3961) Python API for mllib.feature
Davies Liu created SPARK-3961: - Summary: Python API for mllib.feature Key: SPARK-3961 URL: https://issues.apache.org/jira/browse/SPARK-3961 Project: Spark Issue Type: New Feature Reporter: Davies Liu Add completed API for mllib.feature -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3961) Python API for mllib.feature
[ https://issues.apache.org/jira/browse/SPARK-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173172#comment-14173172 ] Apache Spark commented on SPARK-3961: - User 'davies' has created a pull request for this issue: https://github.com/apache/spark/pull/2819 Python API for mllib.feature Key: SPARK-3961 URL: https://issues.apache.org/jira/browse/SPARK-3961 Project: Spark Issue Type: New Feature Reporter: Davies Liu Assignee: Davies Liu Add completed API for mllib.feature -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173199#comment-14173199 ] Saisai Shao commented on SPARK-3958: Hi Josh, have you tried other compression like LZO to narrow down the problem? Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors. This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173207#comment-14173207 ] Josh Rosen commented on SPARK-3958: --- Hi [~jerryshao], I don't have a reliable reproduction for this issue yet, so I haven't tried switching compression schemes or broadcast implementations. I'm working with the user who provided this stack trace to see if we can get more logs to provide additional context. Possible stream-corruption issues in TorrentBroadcast - Key: SPARK-3958 URL: https://issues.apache.org/jira/browse/SPARK-3958 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions. For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task: {code} 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors. This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3720) support ORC in spark sql
[ https://issues.apache.org/jira/browse/SPARK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-3720: -- Attachment: orc.diff This is the diff for orc file support. Because I also work on this item in parallel for spark-2883. Since there is PR already opened for this jira, I attach my diff here, and will work with wangfei to consolidate our work and work together to this feature. Following is the detail description. 1. Basic Operator: saveAsOrcFile and OrcFile. The former is used to save the table into orc format file, and the latter is used to import orc format file into spark sql table. 2. Column pruning 3. Self-contained schema support: The orc support is fully functional independent of hive metastore. The table schema is maintained by the orc file itself. 4. To support the orc file, user need to: import import org.apache.spark.sql.hive.orc._ to bring in the orc support into context 5. The orc file is operated in HiveContext, the only reason is due to package issue, and we don’t want to bring in hive dependency into spark sql. Note that orc operations does not relies on Hive metastore. 6. It support full complicated dataType in Spark Sql, for example, list, seq, and nested datatype. 7. Because the feature is supported in HiveContext, so the sql parser is actually using hive parser. Hive 0.13.1 support. With minor change, after spark hive upgraded to 0.13.1 1. the orc can support different compression method, e.g., SNAPPY, LZO, ZLIB, and NONE 2. prediction pushdown Following is the example to use orc file, which is almost identical to the parquet format support from user perspective. import org.apache.spark.sql.hive.orc._ val ctx = new org.apache.spark.sql.hive.HiveContext(sc) val people = sc.textFile(examples/src/main/resources/people.txt) val schemaString = name age import org.apache.spark.sql._ val schema = StructType(schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) val rowRDD = people.map(_.split(,)).map(p = Row(p(0), p(1).trim)) val peopleSchemaRDD = ctx.applySchema(rowRDD, schema) peopleSchemaRDD.registerTempTable(people) val results = ctx.sql(SELECT name FROM people) results.map(t = Name: + t(0)).collect().foreach(println) peopleSchemaRDD.saveAsOrcFile(people.orc) val orcFile = ctx.orcFile(people.orc) orcFile.registerTempTable(orcFile) val teenagers = ctx.sql(SELECT name FROM orcFile WHERE age = 13 AND age = 19) teenagers.map(t = Name: + t(0)).collect().foreach(println) support ORC in spark sql Key: SPARK-3720 URL: https://issues.apache.org/jira/browse/SPARK-3720 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 1.1.0 Reporter: wangfei Attachments: orc.diff The Optimized Row Columnar (ORC) file format provides a highly efficient way to store data on hdfs.ORC file format has many advantages such as: 1 a single file as the output of each task, which reduces the NameNode's load 2 Hive type support including datetime, decimal, and the complex types (struct, list, map, and union) 3 light-weight indexes stored within the file skip row groups that don't pass predicate filtering seek to a given row 4 block-mode compression based on data type run-length encoding for integer columns dictionary encoding for string columns 5 concurrent reads of the same file using separate RecordReaders 6 ability to split files without scanning for markers 7 bound the amount of memory needed for reading or writing 8 metadata stored using Protocol Buffers, which allows addition and removal of fields Now spark sql support Parquet, support ORC provide people more opts. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-2883: -- Attachment: orc.diff Just for completion, I also attach the diff here besides spark-3720. Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173269#comment-14173269 ] Zhan Zhang commented on SPARK-2883: --- It is scala patch, and I am not very familiar with python support. If I get a chance, will take a look, but currently I may still focus on scala support itself. If you have chance, please give it a try and provide your feedback. This patch is not exhaustively tested, but I did test basic functionalities. For collaboration reason, I upload the patch earlier, so wangfei working for spark-3720 and me can consolidate our work and collaborate to fully support orc format. Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1209) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
[ https://issues.apache.org/jira/browse/SPARK-1209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173284#comment-14173284 ] Patrick Wendell commented on SPARK-1209: Got it! Yeah so anything that is internal, of course would be great to move it to the Spark namespace if it's not necessary to be in Hadoop. SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop -- Key: SPARK-1209 URL: https://issues.apache.org/jira/browse/SPARK-1209 Project: Spark Issue Type: Bug Affects Versions: 0.9.0 Reporter: Sandy Ryza Assignee: Mark Grover It's private, so the change won't break compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3954) Optimization to FileInputDStream
[ https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-3954: --- Summary: Optimization to FileInputDStream (was: promote the speed of convert files to RDDS) Optimization to FileInputDStream Key: SPARK-3954 URL: https://issues.apache.org/jira/browse/SPARK-3954 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.0.0, 1.1.0 Reporter: 宿荣全 about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1、files.map(...) 2、files.zip(fileRDDs) 3、files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence = only one loop spark source code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, V, F](file)) files.zip(fileRDDs).foreach { case (file, rdd) = { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } }} new UnionRDD(context.sparkContext, fileRDDs) } // --- modified code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = for (file - files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } rdd } new UnionRDD(context.sparkContext, fileRDDs) } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3954) promote the speed of convert files to RDDS
[ https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-3954: --- Component/s: (was: Input/Output) Streaming promote the speed of convert files to RDDS --- Key: SPARK-3954 URL: https://issues.apache.org/jira/browse/SPARK-3954 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.0.0, 1.1.0 Reporter: 宿荣全 about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1、files.map(...) 2、files.zip(fileRDDs) 3、files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence = only one loop spark source code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file = context.sparkContext.newAPIHadoopFile[K, V, F](file)) files.zip(fileRDDs).foreach { case (file, rdd) = { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } }} new UnionRDD(context.sparkContext, fileRDDs) } // --- modified code: private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = for (file - files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { if (rdd.partitions.size == 0) { logError(File + file + has no data in it. Spark Streaming can only ingest + files that have been \moved\ to the directory assigned to the file stream. + Refer to the streaming programming guide for more details.) } rdd } new UnionRDD(context.sparkContext, fileRDDs) } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3951) Make the external-* jars fat jars
[ https://issues.apache.org/jira/browse/SPARK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173289#comment-14173289 ] Patrick Wendell commented on SPARK-3951: I think the best solution here is actually to mark spark as provided in these external library poms. That way users can link against them and build their own assemblies that won't include Spark itself. This is consistent with the way that users in general build Spark applications. Make the external-* jars fat jars - Key: SPARK-3951 URL: https://issues.apache.org/jira/browse/SPARK-3951 Project: Spark Issue Type: Bug Reporter: Hari Shreedharan I am not sure this is the right solution to the issue of having to pull in the external jars and all of the dependencies and transitive dependencies, but building them as fat jars will definitely solve this, though not in a not so elegant way. If we build the fat jar, the user can simply deploy this jar and not worry about all of the dependencies. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3962) Mark spark dependency as provided in external libraries
Patrick Wendell created SPARK-3962: -- Summary: Mark spark dependency as provided in external libraries Key: SPARK-3962 URL: https://issues.apache.org/jira/browse/SPARK-3962 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Patrick Wendell Priority: Blocker Right now there is not an easy way for users to link against the external streaming libraries and not accidentally pull Spark into their assembly jar. We should mark Spark as provided in the external connector pom's so that user applications can simply include those like any other dependency in the user's jar. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3962) Mark spark dependency as provided in external libraries
[ https://issues.apache.org/jira/browse/SPARK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-3962: --- Description: Right now there is not an easy way for users to link against the external streaming libraries and not accidentally pull Spark into their assembly jar. We should mark Spark as provided in the external connector pom's so that user applications can simply include those like any other dependency in the user's jar. This is also the best format for third-party libraries that depend on Spark (of which there will eventually be many) so it would be nice for our own build to conform to this nicely. was:Right now there is not an easy way for users to link against the external streaming libraries and not accidentally pull Spark into their assembly jar. We should mark Spark as provided in the external connector pom's so that user applications can simply include those like any other dependency in the user's jar. Mark spark dependency as provided in external libraries - Key: SPARK-3962 URL: https://issues.apache.org/jira/browse/SPARK-3962 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Patrick Wendell Priority: Blocker Right now there is not an easy way for users to link against the external streaming libraries and not accidentally pull Spark into their assembly jar. We should mark Spark as provided in the external connector pom's so that user applications can simply include those like any other dependency in the user's jar. This is also the best format for third-party libraries that depend on Spark (of which there will eventually be many) so it would be nice for our own build to conform to this nicely. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-1561) sbt/sbt assembly generates too many local files
[ https://issues.apache.org/jira/browse/SPARK-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell resolved SPARK-1561. Resolution: Not a Problem Does not seem to be an issue any more - resolving per comment from Sean. sbt/sbt assembly generates too many local files --- Key: SPARK-1561 URL: https://issues.apache.org/jira/browse/SPARK-1561 Project: Spark Issue Type: Improvement Affects Versions: 1.0.0 Reporter: Xiangrui Meng Running `find ./ | wc -l` after `sbt/sbt assembly` returned 564365 This hits the default limit of #INode of an 8GB EXT FS (the default volume size for an EC2 instance), which means you can do nothing after 'sbt/sbt assembly` on such a partition. Most of the small files are under assembly/target/streams and the same folder under examples/. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3431) Parallelize execution of tests
[ https://issues.apache.org/jira/browse/SPARK-3431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173300#comment-14173300 ] Patrick Wendell commented on SPARK-3431: [~srowen] - just wondering, is it trivial to parallelize the tests in maven at the granularity of test suites? Parallelize execution of tests -- Key: SPARK-3431 URL: https://issues.apache.org/jira/browse/SPARK-3431 Project: Spark Issue Type: Improvement Components: Build Reporter: Nicholas Chammas Running all the tests in {{dev/run-tests}} takes up to 2 hours. A common strategy to cut test time down is to parallelize the execution of the tests. Doing that may in turn require some prerequisite changes to be made to how certain tests run. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3963) Support getting task-scoped properties from TaskContext
Patrick Wendell created SPARK-3963: -- Summary: Support getting task-scoped properties from TaskContext Key: SPARK-3963 URL: https://issues.apache.org/jira/browse/SPARK-3963 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val fileName = TaskContext.get.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3963) Support getting task-scoped properties from TaskContext
[ https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-3963: --- Description: This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. was: This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val fileName = TaskContext.get.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. Support getting task-scoped properties from TaskContext --- Key: SPARK-3963 URL: https://issues.apache.org/jira/browse/SPARK-3963 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3964) Python API for Hypothesis testing
Davies Liu created SPARK-3964: - Summary: Python API for Hypothesis testing Key: SPARK-3964 URL: https://issues.apache.org/jira/browse/SPARK-3964 Project: Spark Issue Type: New Feature Components: MLlib Reporter: Davies Liu Python API for Hypothesis testing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3965) Spark assembly for hadoop2 contains avro-mapred for hadoop1
David Jacot created SPARK-3965: -- Summary: Spark assembly for hadoop2 contains avro-mapred for hadoop1 Key: SPARK-3965 URL: https://issues.apache.org/jira/browse/SPARK-3965 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 1.1.0, 1.0.2 Environment: hadoop2, HDP2.1 Reporter: David Jacot When building Spark assembly for hadoop2, org.apache.avro:avro-mapred for hadoop1 is picked and added to the assembly which leads to following exception at runtime. {code} java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:111) ... {code} The patch for SPARK-3039 works well at compile time but artefact's classifier is not applied when assembly is built. I'm not a maven expert but I don't think that classifiers are applied on transitive dependencies. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3966) Fix nullabilities of Cast related to DateType.
Takuya Ueshin created SPARK-3966: Summary: Fix nullabilities of Cast related to DateType. Key: SPARK-3966 URL: https://issues.apache.org/jira/browse/SPARK-3966 Project: Spark Issue Type: Bug Components: SQL Reporter: Takuya Ueshin -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3966) Fix nullabilities of Cast related to DateType.
[ https://issues.apache.org/jira/browse/SPARK-3966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173400#comment-14173400 ] Apache Spark commented on SPARK-3966: - User 'ueshin' has created a pull request for this issue: https://github.com/apache/spark/pull/2820 Fix nullabilities of Cast related to DateType. -- Key: SPARK-3966 URL: https://issues.apache.org/jira/browse/SPARK-3966 Project: Spark Issue Type: Bug Components: SQL Reporter: Takuya Ueshin -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3963) Support getting task-scoped properties from TaskContext
[ https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173404#comment-14173404 ] Patrick Wendell commented on SPARK-3963: [~rxin] and [~adav] I'd be interested in any feedback on this. Support getting task-scoped properties from TaskContext --- Key: SPARK-3963 URL: https://issues.apache.org/jira/browse/SPARK-3963 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. This is structured as a simple (String, String) hash map for ease of porting to python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3963) Support getting task-scoped properties from TaskContext
[ https://issues.apache.org/jira/browse/SPARK-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Wendell updated SPARK-3963: --- Description: This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. This is structured as a simple (String, String) hash map for ease of porting to python. was: This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. Support getting task-scoped properties from TaskContext --- Key: SPARK-3963 URL: https://issues.apache.org/jira/browse/SPARK-3963 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell This is a proposal for a minor feature. Given stabilization of the TaskContext API, it would be nice to have a mechanism for Spark jobs to access properties that are defined based on task-level scope by Spark RDD's. I'd like to propose adding a simple properties hash map with some standard spark properties that users can access. Later it would be nice to support users setting these properties, but for now to keep it simple in 1.2. I'd prefer users not be able to set them. The main use case is providing the file name from Hadoop RDD's, a very common request. But I'd imagine us using this for other things later on. We could also use this to expose some of the taskMetrics, such as e.g. the input bytes. {code} val data = sc.textFile(s3n//..2014/*/*/*.json) data.mapPartitions { val tc = TaskContext.get val filename = tc.getProperty(TaskContext.HADOOP_FILE_NAME) val parts = fileName.split(/) val (year, month, day) = (parts[3], parts[4], parts[5]) ... } {code} Internally we'd have a method called setProperty, but this wouldn't be exposed initially. This is structured as a simple (String, String) hash map for ease of porting to python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3885) Provide mechanism to remove accumulators once they are no longer used
[ https://issues.apache.org/jira/browse/SPARK-3885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173407#comment-14173407 ] Nathan Kronenfeld commented on SPARK-3885: -- I tried reusing accumulators and clearing them, but I'm still running out of memory. According to the profiler, there is a lot still held by Accumulators$.localAccums. If I read things right, this is where the workers hold on to their versions of the accumulator? It looks like there is a clear call, but it looks to me like it's only run at the beginning of a task (which means it should be keeping these around at the end of a job, until another task is run on that thread - which makes it a bit tough to profile. Am I reading all this correctly? If so, I can see why it isn't cleared out at the end of a job - there's probably no way of doing that safely, since the workers don't know when the end is. Ideally, I'd love a call whereby I can explicitly release an accumulator. It seems to me that would require a parallel map in Accumulators$ that kept track of the threads on which each accumulator was stored, so it could clear them. Am I understanding this all correctly? If so, I think I could put together the fix I describe pretty easily. Provide mechanism to remove accumulators once they are no longer used - Key: SPARK-3885 URL: https://issues.apache.org/jira/browse/SPARK-3885 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: Josh Rosen Spark does not currently provide any mechanism to delete accumulators after they are no longer used. This can lead to OOMs for long-lived SparkContexts that create many large accumulators. Part of the problem is that accumulators are registered in a global {{Accumulators}} registry. Maybe the fix would be as simple as using weak references in the Accumulators registry so that accumulators can be GC'd once they can no longer be used. In the meantime, here's a workaround that users can try: Accumulators have a public setValue() method that can be called (only by the driver) to change an accumulator’s value. You might be able to use this to reset accumulators’ values to smaller objects (e.g. the “zero” object of whatever your accumulator type is, or ‘null’ if you’re sure that the accumulator will never be accessed again). This issue was originally reported by [~nkronenfeld] on the dev mailing list: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-Accumulator-question-td8709.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org