[jira] [Commented] (SPARK-25557) ORC predicate pushdown for nested fields
[ https://issues.apache.org/jira/browse/SPARK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851920#comment-16851920 ] Ivan Vergiliev commented on SPARK-25557: It seems like this was partially addressed by https://github.com/apache/spark/pull/23943 , and the remaining work now is to make it work for the v2 source - is that correct? > ORC predicate pushdown for nested fields > > > Key: SPARK-25557 > URL: https://issues.apache.org/jira/browse/SPARK-25557 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27105) Prevent exponential complexity in ORC `createFilter`
Ivan Vergiliev created SPARK-27105: -- Summary: Prevent exponential complexity in ORC `createFilter` Key: SPARK-27105 URL: https://issues.apache.org/jira/browse/SPARK-27105 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Ivan Vergiliev `OrcFilters.createFilters` currently has complexity that's exponential in the height of the filter tree. There are multiple places in Spark that try to prevent the generation of skewed trees so as to not trigger this behaviour, for example: - `org.apache.spark.sql.catalyst.parser.AstBuilder.visitLogicalBinary` combines a number of binary logical expressions into a balanced tree. - https://github.com/apache/spark/pull/22313 introduced a change to `OrcFilters` to create a balanced tree instead of a skewed tree. However, the underlying exponential behaviour can still be triggered by code paths that don't go through any of the tree balancing methods. For example, if one generates a tree of `Column`s directly in user code, there's nothing in Spark that automatically balances that tree and, hence, skewed trees hit the exponential behaviour. We have hit this in production with jobs mysteriously taking hours on the Spark driver with no worker activity, with as few as ~30 OR filters. I have a fix locally that makes the underlying logic have linear complexity instead of exponential complexity. With this fix, the code can handle thousands of filters in milliseconds. I'll send a PR with the fix soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26859) Reading ORC files with explicit schema can result in wrong data
Ivan Vergiliev created SPARK-26859: -- Summary: Reading ORC files with explicit schema can result in wrong data Key: SPARK-26859 URL: https://issues.apache.org/jira/browse/SPARK-26859 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Ivan Vergiliev There is a bug in the ORC deserialization code that, when triggered, results in completely wrong data being read. I've marked this as a Blocker as per the docs in https://spark.apache.org/contributing.html as it's a data correctness issue. The bug is triggered when the following set of conditions are all met: - the non-vectorized ORC reader is being used; - a schema is explicitly specified when reading the ORC file - the provided schema has columns not present in the ORC file, and these columns are in the middle of the schema - the ORC file being read contains null values in the columns after the ones added by the schema. When all of these are met: - the internal state of the ORC deserializer gets messed up, and, as a result - the null values from the ORC file end up being set on wrong columns, not the one they're in, and - the old values from the null columns don't get cleared from the previous record. Here's a concrete example. Let's consider the following DataFrame: {code:scala} val rdd = sparkContext.parallelize(Seq((1, 2, "abc"), (4, 5, "def"), (8, 9, null))) val df = rdd.toDF("col1", "col2", "col3") {code} and the following schema: {code:scala} col1 int, col4 int, col2 int, col3 string {code} Notice the `col4 int` added in the middle that doesn't exist in the dataframe. Saving this dataframe to ORC and then reading it back with the specified schema should result in reading the same values, with nulls for `col4`. Instead, we get the following back: {code:java} [1,null,2,abc] [4,null,5,def] [8,null,null,def] {code} Notice how the `def` from the second record doesn't get properly cleared and ends up in the third record as well; also, instead of `col2 = 9` in the last record as expected, we get the null that should've been in column 3 instead. *Impact* When this issue is triggered, it results in completely wrong results being read from the ORC file. The set of conditions under which it gets triggered is somewhat narrow so the set of affected users is probably limited. There are possibly also people that are affected but haven't realized it because the conditions are so obscure. *Bug details* The issue is caused by calling `setNullAt` with a wrong index in `OrcDeserializer.scala:deserialize()`. I have a fix that I'll send out for review shortly. *Workaround* This bug is currently only triggered when new columns are added to the middle of the schema. This means that it can be worked around by only adding new columns at the end. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2984) FileNotFoundException on _temporary directory
[ https://issues.apache.org/jira/browse/SPARK-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745073#comment-14745073 ] Ivan Vergiliev commented on SPARK-2984: --- I'm also seeing this - Spark 1.3.1, Spark Standalone, speculation enabled. Any ideas on what we can do to avoid this (except for disabling speculation)? Stack trace: {code} [2015-09-15 08:25:18,090] WARN .jobserver.JobManagerActor [] [akka://JobServer/user/context-supervisor/default-context] - Exception from job 419b8ce6-8284-46a6-bd5e-55585c844105: java.io.FileNotFoundException: File hdfs://FOLDER/_temporary/0/task_201509150825_115810_r_33 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1020) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:903) at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:798) at com.leanplum.spark.SessionsJob.runJob(SessionsJob.java:104) at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Thanks, Ivan > FileNotFoundException on _temporary directory > - > > Key: SPARK-2984 > URL: https://issues.apache.org/jira/browse/SPARK-2984 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Andrew Ash >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.3.0 > > > We've seen several stacktraces and threads on the user mailing list where > people are having issues with a {{FileNotFoundException}} stemming from an > HDFS path containing {{_temporary}}. > I ([~aash]) think this may be related to {{spark.speculation}}. I think the > error condition might manifest in this circumstance: > 1) task T starts on a executor E1 > 2) it takes a long time, so task T' is started on another executor E2 > 3) T finishes in E1 so moves its data from {{_temporary}} to the final > destination and deletes the {{_temporary}} directory during cleanup > 4) T' finishes in E2 and attempts to move its data from {{_temporary}}, but > those files no longer exist! exception > Some samples: > {noformat} > 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job > 140774430 ms.0 > java.io.FileNotFoundException: File > hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) > at > org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) > at > org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) >
[jira] [Created] (SPARK-5269) BlockManager.dataDeserialize always creates a new serializer instance
Ivan Vergiliev created SPARK-5269: - Summary: BlockManager.dataDeserialize always creates a new serializer instance Key: SPARK-5269 URL: https://issues.apache.org/jira/browse/SPARK-5269 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Ivan Vergiliev BlockManager.dataDeserialize always creates a new instance of the serializer, which is pretty slow in some cases. I'm using Kryo serialization and have a custom registrator, and its register method is showing up as taking about 15% of the execution time in my profiles. This started happening after I increased the number of keys in a job with a shuffle phase by a factor of 40. One solution I can think of is to create a ThreadLocal SerializerInstance for the defaultSerializer, and only create a new one if a custom serializer is passed in. AFAICT a custom serializer is passed only from DiskStore.getValues, and that, on the other hand, depends on the serializer passed to ExternalSorter. I don't know how often this is used, but I think this can still be a good solution for the standard use case. Oh, and also - ExternalSorter already has a SerializerInstance, so if the getValues method is called from a single thread, maybe we can pass that directly? I'd be happy to try a patch but would probably need a confirmation from someone that this approach would indeed work (or an idea for another). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4743) Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey
Ivan Vergiliev created SPARK-4743: - Summary: Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey Key: SPARK-4743 URL: https://issues.apache.org/jira/browse/SPARK-4743 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Ivan Vergiliev AggregateByKey and foldByKey in PairRDDFunctions both use the closure serializer to serialize and deserialize the initial value. This means that the Java serializer is always used, which can be very expensive if there's a large number of groups. Calling combineByKey manually and using the normal serializer instead of the closure one improved the performance on the dataset I'm testing with by about 30-35%. I'm not familiar enough with the codebase to be certain that replacing the serializer here is OK, but it works correctly in my tests, and it's only serializing a single value of type U, which should be serializable by the default one since it can be the output of a job. Let me know if I'm missing anything. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org